From 2b1a6039fb28dba2ec02a4c6d9c22040a3001c65 Mon Sep 17 00:00:00 2001 From: danielwiehl Date: Fri, 21 Oct 2022 18:30:06 +0200 Subject: [PATCH] perf(platform/messaging): transport intents to subscribed clients only Previously, an intent was transported to all qualified clients regardless of whether the client had a subscription for the intent, causing unnecessary messaging delivery and is a prerequisite for supporting retained intents. closes #124 --- .../publish-message.component.ts | 2 +- .../intent-based-messaging.adoc | 6 +- .../intention-api/intention-api.adoc | 6 +- .../messaging/intent-based-messaging-specs.ts | 4 +- .../src/lib/client/client-connect.script.ts | 4 +- .../lib/client/client-disconnect.script.ts | 6 +- .../manifest-registry/manifest-service.ts | 4 +- .../lib/client/messaging/broker-gateway.ts | 134 +++--- .../src/lib/client/messaging/intent-client.ts | 27 +- .../lib/client/messaging/messaging.script.ts | 72 +++- .../lib/client/messaging/messaging.spec.ts | 141 ++++++- .../messaging/\311\265intent-client.ts" | 12 +- .../src/lib/error.util.ts | 4 +- .../host/client-registry/client.registry.ts | 7 +- .../\311\265client.registry.ts" | 2 + .../manifest-registry.spec.ts | 8 +- .../manifest-registry/manifest-registry.ts | 4 +- .../\311\265manifest-registry.ts" | 6 +- .../intent-param-validator.interceptor.ts | 2 +- .../intent-subscription.registry.ts | 59 +++ .../lib/host/message-broker/message-broker.ts | 351 +++++++++------- .../message-subscription.registry.ts | 112 +++++ .../topic-subscription.registry.spec.ts | 384 +++++++----------- .../topic-subscription.registry.ts | 157 ++----- .../lib/intent-subscription-legacy-support.ts | 41 ++ .../src/lib/messaging.model.ts | 6 +- .../src/lib/microfrontend-platform.ts | 4 +- .../src/lib/operators.ts | 11 +- .../src/lib/version.spec.ts | 4 +- .../src/lib/\311\265messaging.model.ts" | 30 +- 30 files changed, 966 insertions(+), 644 deletions(-) create mode 100644 projects/scion/microfrontend-platform/src/lib/host/message-broker/intent-subscription.registry.ts create mode 100644 projects/scion/microfrontend-platform/src/lib/host/message-broker/message-subscription.registry.ts create mode 100644 projects/scion/microfrontend-platform/src/lib/intent-subscription-legacy-support.ts diff --git a/apps/microfrontend-platform-testing-app/src/app/messaging/publish-message/publish-message.component.ts b/apps/microfrontend-platform-testing-app/src/app/messaging/publish-message/publish-message.component.ts index 275e7ea5..9130e4aa 100644 --- a/apps/microfrontend-platform-testing-app/src/app/messaging/publish-message/publish-message.component.ts +++ b/apps/microfrontend-platform-testing-app/src/app/messaging/publish-message/publish-message.component.ts @@ -183,7 +183,7 @@ export class PublishMessageComponent implements OnDestroy { try { if (requestReply) { this._requestResponseSubscription = this._intentClient.request$({type, qualifier}, message, {headers: headers}) - .pipe(finalize(() => this.markPublishing(true))) + .pipe(finalize(() => this.markPublishing(false))) .subscribe({ next: reply => this.replies.push(reply), error: error => this.publishError = error, diff --git a/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/intent-based-messaging.adoc b/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/intent-based-messaging.adoc index 9fc3887e..16fe35f7 100644 --- a/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/intent-based-messaging.adoc +++ b/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/intent-based-messaging.adoc @@ -58,7 +58,7 @@ To learn more about an intention, see chapter < [[chapter:intent-based-messaging:issuing-an-intent]] [discrete] === Issuing an Intent -A micro application can issue intents for intentions declared in its manifest. The platform transports the intent to micro applications that provide a fulfilling capability. Along with the intent, you can pass transfer data, either as payload, message headers or parameters. Passed data must be serializable with the _Structured Clone Algorithm_. +A micro application can issue intents for intentions declared in its manifest. The platform transports the intent to micro applications that provide a fulfilling capability. Along with the intent, the application can pass transfer data, either as payload, message headers or parameters. Passed data must be serializable with the _Structured Clone Algorithm_. The following code snippet illustrates starting the checkout wizard via intent. @@ -98,7 +98,7 @@ To learn more about a capability, see chapter <>. +Intents are transported to applications that provide a fulfilling capability and are typically subscribed to in an activator. An activator is a special microfrontend that a micro application can provide to interact with the platform. Activators are loaded when starting the host application and run for the entire application lifecycle. An activator microfrontend is special in that it is never displayed to the user. Learn more about activator in the chapter <>. The following code snippet illustrates how to listen to intents. @@ -109,7 +109,7 @@ include::intent-based-messaging.snippets.ts[tags=handle-intent] <1> Defines the selector to filter intents. Without a selector, you would receive all intents you have defined a capability for. The selector supports the use of wildcards in the qualifier. <2> Subscribes to intents that match the selector. -IMPORTANT: A micro application only receives intents for which it provides a respective capability. The selector is used as a filter only. +IMPORTANT: A micro application only receives intents for which it provides a fulfilling capability. The selector is used as a filter only. [[chapter:intent-based-messaging:request-response-message-exchange-pattern]] [discrete] diff --git a/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/intention-api/intention-api.adoc b/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/intention-api/intention-api.adoc index b59938b5..88d3a7b4 100644 --- a/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/intention-api/intention-api.adoc +++ b/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/intention-api/intention-api.adoc @@ -158,7 +158,7 @@ The following code snippet shows an example of how an application can subscribe include::intention-api.snippets.ts[tags=intent-handling] ---- -IMPORTANT: A micro application only receives intents for which it provides a respective capability. The selector is used as a filter only. +IMPORTANT: A micro application only receives intents for which it provides a fulfilling capability. The selector is used as a filter only. For more information about handling an intent, see the chapter <> in <>. @@ -252,7 +252,7 @@ include::intention-api.snippets.ts[tags=capability-lookup-toolbar-items] NOTE: To see how to handle intents in the capability providing micro application, see the chapter <> in <>. -By passing a `ManifestObjectFilter` to the `lookupCapabilities$` method, you can control which capabilities to observe. Specified filter criteria are "AND"ed together. If not passing a filter, all capabilities visible to the requesting micro application are observed. When subscribing to the Observable, it emits the requested capabilities. The Observable never completes and emits continuously when satisfying capabilities are registered or unregistered. +By passing a `ManifestObjectFilter` to the `lookupCapabilities$` method, you can control which capabilities to observe. Specified filter criteria are "AND"ed together. If not passing a filter, all capabilities visible to the requesting micro application are observed. When subscribing to the Observable, it emits the requested capabilities. The Observable never completes and emits continuously when fulfilling capabilities are registered or unregistered. IMPORTANT: A micro application can only look up its own capabilities and public capabilities for which it has declared an intention. @@ -290,7 +290,7 @@ a| `string` === Browsing Intentions The platform allows you to browse and observe intentions. Unlike <>, an application can browse the intentions of all micro applications. The use case for browsing intentions is somewhat technical, e.g., used by the _DevTools_ to list intentions declared by micro applications. -You can browse intentions using the `ManifestService.lookupIntentions$` method. By passing a `ManifestObjectFilter` to the `lookupIntentions$` method, you can control which intentions to observe. Specified filter criteria are "AND"ed together. If not passing a filter, all intentions are observed. When subscribing to the Observable, it emits the requested intentions. The Observable never completes and emits continuously when satisfying intentions are registered or unregistered. +You can browse intentions using the `ManifestService.lookupIntentions$` method. By passing a `ManifestObjectFilter` to the `lookupIntentions$` method, you can control which intentions to observe. Specified filter criteria are "AND"ed together. If not passing a filter, all intentions are observed. When subscribing to the Observable, it emits the requested intentions. The Observable never completes and emits continuously when matching intentions are registered or unregistered. .Properties of `ManifestObjectFilter` to filter intentions [cols="1,2,1,8"] diff --git a/projects/scion/microfrontend-platform.e2e/src/messaging/intent-based-messaging-specs.ts b/projects/scion/microfrontend-platform.e2e/src/messaging/intent-based-messaging-specs.ts index 71a2c274..43b53b1a 100644 --- a/projects/scion/microfrontend-platform.e2e/src/messaging/intent-based-messaging-specs.ts +++ b/projects/scion/microfrontend-platform.e2e/src/messaging/intent-based-messaging-specs.ts @@ -40,7 +40,7 @@ export namespace IntendBasedMessagingSpecs { } /** - * Tests that an intent can only be issued if there is one application at minimum providing a respective capability. + * Tests that an intent can only be issued if there is one application at minimum providing a fulfilling capability. */ export async function intentNotFulfilledSpec(testingAppPO: TestingAppPO): Promise { const pagePOs = await testingAppPO.navigateTo({ @@ -935,7 +935,7 @@ export namespace IntendBasedMessagingSpecs { } /** - * Tests that the platform resolves to the satisfying capability. + * Tests that the platform resolves to the fulfilling capability. */ export async function resolveCapabilitySpec(testingAppPO: TestingAppPO): Promise { const pagePOs = await testingAppPO.navigateTo({ diff --git a/projects/scion/microfrontend-platform/src/lib/client/client-connect.script.ts b/projects/scion/microfrontend-platform/src/lib/client/client-connect.script.ts index c88cd45b..fd676ea5 100644 --- a/projects/scion/microfrontend-platform/src/lib/client/client-connect.script.ts +++ b/projects/scion/microfrontend-platform/src/lib/client/client-connect.script.ts @@ -18,7 +18,7 @@ import {filterByTransport} from '../operators'; export async function connectToHost({symbolicName, brokerDiscoverTimeout, connectCount}, observer: Observer): Promise { // eslint-disable-line @typescript-eslint/typedef await MicrofrontendPlatform.connectToHost(symbolicName, {brokerDiscoverTimeout}); - observer.next(Beans.get(ɵBrokerGateway).brokerInfo.clientId); + observer.next(Beans.get(ɵBrokerGateway).session.clientId); for (let i = 1; i < connectCount; i++) { const {clientId} = await Beans.get(ɵBrokerGateway).connectToBroker(); @@ -29,7 +29,7 @@ export async function connectToHost({symbolicName, brokerDiscoverTimeout, connec export async function connectClientToRemoteHost({symbolicName, brokerDiscoverTimeout}, observer: Observer): Promise { // eslint-disable-line @typescript-eslint/typedef Beans.register(ɵWINDOW_TOP, {useValue: window}); // simulate to be loaded into the top-level window await MicrofrontendPlatform.connectToHost(symbolicName, {brokerDiscoverTimeout}); - observer.next(Beans.get(ɵBrokerGateway).brokerInfo.clientId); + observer.next(Beans.get(ɵBrokerGateway).session.clientId); } /** diff --git a/projects/scion/microfrontend-platform/src/lib/client/client-disconnect.script.ts b/projects/scion/microfrontend-platform/src/lib/client/client-disconnect.script.ts index b3f08ede..7f26b074 100644 --- a/projects/scion/microfrontend-platform/src/lib/client/client-disconnect.script.ts +++ b/projects/scion/microfrontend-platform/src/lib/client/client-disconnect.script.ts @@ -23,18 +23,18 @@ export async function connectToHost({symbolicName, disconnectOnUnloadDisabled = Beans.register(VERSION, {useValue: version}); } await MicrofrontendPlatform.connectToHost(symbolicName); - observer.next(Beans.get(ɵBrokerGateway).brokerInfo.clientId); + observer.next(Beans.get(ɵBrokerGateway).session.clientId); } export async function connectToHostThenStopPlatform({symbolicName}, observer: Observer): Promise { // eslint-disable-line @typescript-eslint/typedef await MicrofrontendPlatform.connectToHost(symbolicName); - observer.next(Beans.get(ɵBrokerGateway).brokerInfo.clientId); + observer.next(Beans.get(ɵBrokerGateway).session.clientId); await MicrofrontendPlatform.destroy(); } export async function connectToHostThenLocationHref({symbolicName, locationHref}, observer: Observer): Promise { // eslint-disable-line @typescript-eslint/typedef await MicrofrontendPlatform.connectToHost(symbolicName); - observer.next(Beans.get(ɵBrokerGateway).brokerInfo.clientId); + observer.next(Beans.get(ɵBrokerGateway).session.clientId); window.location.href = locationHref; } diff --git a/projects/scion/microfrontend-platform/src/lib/client/manifest-registry/manifest-service.ts b/projects/scion/microfrontend-platform/src/lib/client/manifest-registry/manifest-service.ts index 0b1695fb..cd0bdb21 100644 --- a/projects/scion/microfrontend-platform/src/lib/client/manifest-registry/manifest-service.ts +++ b/projects/scion/microfrontend-platform/src/lib/client/manifest-registry/manifest-service.ts @@ -83,7 +83,7 @@ export class ManifestService implements Initializer { * to match any value, e.g., `{property: '*'}`, or partial matching to find capabilities with at least the specified qualifier * properties. Partial matching is enabled by appending the _any-more_ entry to the qualifier, as following: `{'*': '*'}`. * @return An Observable that, when subscribed, emits the requested capabilities. - * It never completes and emits continuously when satisfying capabilities are registered or unregistered. + * It never completes and emits continuously when fulfilling capabilities are registered or unregistered. */ public lookupCapabilities$(filter?: ManifestObjectFilter): Observable { return Beans.get(MessageClient).request$(ManifestRegistryTopics.LookupCapabilities, filter) @@ -100,7 +100,7 @@ export class ManifestService implements Initializer { * to match any value, e.g., `{property: '*'}`, or partial matching to find intentions with at least the specified qualifier * properties. Partial matching is enabled by appending the _any-more_ entry to the qualifier, as following: `{'*': '*'}`. * @return An Observable that, when subscribed, emits the requested intentions. - * It never completes and emits continuously when satisfying intentions are registered or unregistered. + * It never completes and emits continuously when matching intentions are registered or unregistered. */ public lookupIntentions$(filter?: ManifestObjectFilter): Observable { return Beans.get(MessageClient).request$(ManifestRegistryTopics.LookupIntentions, filter) diff --git a/projects/scion/microfrontend-platform/src/lib/client/messaging/broker-gateway.ts b/projects/scion/microfrontend-platform/src/lib/client/messaging/broker-gateway.ts index 81452b8b..db32fc92 100644 --- a/projects/scion/microfrontend-platform/src/lib/client/messaging/broker-gateway.ts +++ b/projects/scion/microfrontend-platform/src/lib/client/messaging/broker-gateway.ts @@ -8,9 +8,9 @@ * SPDX-License-Identifier: EPL-2.0 */ import {AsyncSubject, EMPTY, firstValueFrom, fromEvent, interval, lastValueFrom, merge, MonoTypeOperatorFunction, NEVER, noop, Observable, Observer, of, ReplaySubject, Subject, TeardownLogic, throwError, timeout, timer} from 'rxjs'; -import {ConnackMessage, MessageDeliveryStatus, MessageEnvelope, MessagingChannel, MessagingTransport, PlatformTopics, TopicSubscribeCommand, TopicUnsubscribeCommand} from '../../ɵmessaging.model'; +import {ConnackMessage, IntentSubscribeCommand, MessageDeliveryStatus, MessageEnvelope, MessagingChannel, MessagingTransport, PlatformTopics, SubscribeCommand, TopicSubscribeCommand, UnsubscribeCommand} from '../../ɵmessaging.model'; import {finalize, map, mergeMap, take, takeUntil, tap} from 'rxjs/operators'; -import {filterByChannel, filterByMessageHeader, filterByOrigin, filterByTopicChannel, filterByTransport, pluckMessage} from '../../operators'; +import {filterByChannel, filterByMessageHeader, filterByOrigin, filterByTopicChannel, filterByTransport, filterByWindow, pluckMessage} from '../../operators'; import {UUID} from '@scion/toolkit/uuid'; import {IntentMessage, Message, MessageHeaders, TopicMessage} from '../../messaging.model'; import {Logger, NULL_LOGGER} from '../../logger'; @@ -24,6 +24,7 @@ import {MessageClient} from '../../client/messaging/message-client'; import {runSafe} from '../../safe-runner'; import {VERSION} from '../../version'; import {stringifyError} from '../../error.util'; +import {IntentSelector} from './intent-client'; /** * The gateway is responsible for dispatching messages between the client and the broker. @@ -59,6 +60,12 @@ export abstract class BrokerGateway { */ public abstract subscribeToTopic$(topic: string): Observable>; + /** + * Subscribes to intents that match the specified selector and for which the application provides a fulfilling capability. + * The Observable never completes. + */ + public abstract subscribeToIntent$(selector?: IntentSelector): Observable>; + /** * An Observable that emits when a message from the message broker is received. */ @@ -97,6 +104,10 @@ export class NullBrokerGateway implements BrokerGateway { public subscribeToTopic$(topic: string): Observable> { return NEVER; } + + public subscribeToIntent$(selector: IntentSelector): Observable> { + return NEVER; + } } /** @@ -113,8 +124,8 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer { private _appSymbolicName: string; private _brokerDiscoverTimeout: number; private _messageDeliveryTimeout: number; - private _brokerInfo: BrokerInfo | null = null; - private _brokerInfo$ = new AsyncSubject(); + private _session: Session | null = null; + private _session$ = new AsyncSubject(); public readonly message$ = new Subject>(); @@ -126,25 +137,25 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer { public async init(): Promise { try { - const brokerInfo = await this.connectToBroker(); - this.installBrokerMessageListener(brokerInfo); - this.installHeartbeatPublisher(brokerInfo); - this._brokerInfo = brokerInfo; - this._brokerInfo$.next(brokerInfo); - this._brokerInfo$.complete(); + const session = await this.connectToBroker(); + this.installBrokerMessageListener(session); + this.installHeartbeatPublisher(session); + this._session = session; + this._session$.next(session); + this._session$.complete(); } catch (error) { - this._brokerInfo$.error(error); + this._session$.error(error); throw error; } } public isConnected(): Promise { - return lastValueFrom(this._brokerInfo$).then(() => true).catch(() => false); + return lastValueFrom(this._session$).then(() => true).catch(() => false); } - public get brokerInfo(): BrokerInfo | null { - return this._brokerInfo; + public get session(): Session | null { + return this._session; } public async postMessage(channel: MessagingChannel, message: Message): Promise { @@ -154,7 +165,7 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer { // If not connected to the broker, wait until connected. If connected, continue execution immediately // without spawning a microtask. Otherwise, messages cannot be published during platform shutdown. - const brokerInfo = this._brokerInfo || await lastValueFrom(this._brokerInfo$); + const session = this._session || await lastValueFrom(this._session$); const messageId = UUID.randomUUID(); const envelope: MessageEnvelope = { @@ -166,7 +177,7 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer { .set(MessageHeaders.MessageId, messageId) .set(MessageHeaders.Timestamp, Date.now()) .set(MessageHeaders.AppSymbolicName, this._appSymbolicName) - .set(MessageHeaders.ClientId, brokerInfo.clientId); + .set(MessageHeaders.ClientId, session.clientId); // Install Promise that resolves once the broker has acknowledged the message, or that rejects otherwise. const postError$ = new Subject(); @@ -187,7 +198,7 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer { }); try { - brokerInfo.window.postMessage(envelope, brokerInfo.origin); + session.broker.window.postMessage(envelope, session.broker.origin); } catch (error) { postError$.error(error); @@ -228,7 +239,28 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer { } public subscribeToTopic$(topic: string): Observable> { - return new Observable((observer: Observer): TeardownLogic => { + return this.subscribe$((subscriberId: string): TopicSubscribeCommand => ({topic, subscriberId, headers: new Map()}), { + messageChannel: MessagingChannel.Topic, + subscribeChannel: MessagingChannel.TopicSubscribe, + unsubscribeChannel: MessagingChannel.TopicUnsubscribe, + }); + } + + public subscribeToIntent$(selector?: IntentSelector): Observable> { + return this.subscribe$((subscriberId: string): IntentSubscribeCommand => ({selector, subscriberId, headers: new Map()}), { + messageChannel: MessagingChannel.Intent, + subscribeChannel: MessagingChannel.IntentSubscribe, + unsubscribeChannel: MessagingChannel.IntentUnsubscribe, + }); + } + + /** + * Subscribes to described destination, unless the platform has been stopped at the time of subscription. + */ + private subscribe$(produceSubscribeCommand: (subscriberId: string) => SubscribeCommand, descriptor: {messageChannel: MessagingChannel; subscribeChannel: MessagingChannel; unsubscribeChannel: MessagingChannel}): Observable { + const {messageChannel, subscribeChannel, unsubscribeChannel} = descriptor; + + return new Observable((observer: Observer): TeardownLogic => { if (isPlatformStopped()) { observer.error(GatewayErrors.PLATFORM_STOPPED_ERROR); return noop; @@ -237,25 +269,25 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer { const subscriberId = UUID.randomUUID(); const unsubscribe$ = new Subject(); const subscribeError$ = new Subject(); + const subscribeCommand: SubscribeCommand = produceSubscribeCommand(subscriberId); - // Receive messages sent to the given topic. + // Receive messages of given subscription. merge(this.message$, subscribeError$) .pipe( - filterByChannel(MessagingChannel.Topic), - filterByMessageHeader({key: MessageHeaders.ɵTopicSubscriberId, value: subscriberId}), + filterByChannel(messageChannel), + filterByMessageHeader({name: MessageHeaders.ɵSubscriberId, value: subscriberId}), pluckMessage(), takeUntil(merge(this._platformStopping$, unsubscribe$)), - finalize(() => this.unsubscribeFromTopic(topic, subscriberId)), + finalize(() => this.unsubscribe({unsubscribeChannel, subscriberId, logContext: JSON.stringify(subscribeCommand)})), ) .subscribe({ - next: reply => observer.next(reply), + next: message => observer.next(message), error: error => observer.error(error), complete: noop, // As per the API, the Observable never completes. }); - // Post the topic subscription to the broker. - const topicSubscribeMessage: TopicSubscribeCommand = {subscriberId, topic, headers: new Map()}; - this.postMessage(MessagingChannel.TopicSubscribe, topicSubscribeMessage) + // Post the subscription to the broker. + this.postMessage(subscribeChannel, subscribeCommand) .catch(error => subscribeError$.error(error)); return (): void => unsubscribe$.next(); @@ -263,19 +295,20 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer { } /** - * Unsubscribes given topic subscription. Does nothing if the platform is stopped. + * Unsubscribes from described destination. Does nothing if the platform is stopped. */ - private async unsubscribeFromTopic(topic: string, subscriberId: string): Promise { + private async unsubscribe(descriptor: {unsubscribeChannel: MessagingChannel; subscriberId: string; logContext: string}): Promise { if (isPlatformStopped()) { return; } - const topicUnsubscribeCommand: TopicUnsubscribeCommand = {subscriberId, headers: new Map()}; + const {unsubscribeChannel, subscriberId, logContext} = descriptor; + const unsubscribeCommand: UnsubscribeCommand = {subscriberId, headers: new Map()}; try { - await this.postMessage(MessagingChannel.TopicUnsubscribe, topicUnsubscribeCommand); + await this.postMessage(unsubscribeChannel, unsubscribeCommand); } catch (error) { - Beans.get(Logger, {orElseGet: NULL_LOGGER}).error(`[TopicUnsubscribeError] Failed to unsubscribe from topic '${topic}'. Caused by: ${error}`); // Fall back using NULL_LOGGER, e.g., when the platform is stopping. + Beans.get(Logger, {orElseGet: NULL_LOGGER}).error(`[UnsubscribeError] Failed to unsubscribe from destination: '${logContext}'. Caused by: ${error}`); // Fall back using NULL_LOGGER, e.g., when the platform is stopping. } } @@ -283,10 +316,11 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer { * Subscribes to messages sent to this client. * Messages are dispatched to {@link message$}. */ - private installBrokerMessageListener(brokerInfo: BrokerInfo): void { + private installBrokerMessageListener(session: Session): void { fromEvent(window, 'message') .pipe( - filterByOrigin(brokerInfo.origin), + filterByWindow(session.broker.window), + filterByOrigin(session.broker.origin), filterByTransport(MessagingTransport.BrokerToClient), fixMapObjects(), takeUntil(this._platformStopping$), @@ -299,14 +333,14 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer { * * Note that no heartbeat scheduler is installed if running in the context of the host application. */ - private installHeartbeatPublisher(brokerInfo: BrokerInfo): void { + private installHeartbeatPublisher(session: Session): void { if (Beans.get(IS_PLATFORM_HOST)) { return; // The host app client does not send a heartbeat. } - interval(brokerInfo.heartbeatInterval) + interval(session.heartbeatInterval) .pipe(takeUntil(this._platformStopping$)) .subscribe(() => runSafe(() => { - Beans.get(MessageClient).publish(PlatformTopics.heartbeat(brokerInfo.clientId)).then(); + Beans.get(MessageClient).publish(PlatformTopics.heartbeat(session.clientId)).then(); })); } @@ -316,10 +350,10 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer { * When the broker receives the CONNECT message and trusts this client, the broker responds with a CONNACK message, * or rejects the connect attempt otherwise. * - * @return A Promise that, when connected, resolves to information about the broker, or that rejects if the connect attempt + * @return A Promise that, when connected, resolves to information about the connected client and broker, or that rejects if the connect attempt * failed, either because the broker could not be found or because the application is not allowed to connect. */ - public connectToBroker(): Promise { + public connectToBroker(): Promise { const replyTo = UUID.randomUUID(); const connectPromise = firstValueFrom(fromEvent(window, 'message') .pipe( @@ -330,11 +364,13 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer { if (response?.returnCode !== 'accepted') { return throwError(() => `${response?.returnMessage ?? 'UNEXPECTED: Empty broker discovery response'} [code: '${response?.returnCode ?? 'n/a'}']`); } - return of({ + return of({ clientId: response.clientId!, heartbeatInterval: response.heartbeatInterval!, - window: messageEvent.source as Window, - origin: messageEvent.origin, + broker: { + window: messageEvent.source as Window, + origin: messageEvent.origin, + }, }); }), timeout({first: this._brokerDiscoverTimeout, with: () => throwError(() => GatewayErrors.BROKER_DISCOVER_ERROR(this._brokerDiscoverTimeout))}), @@ -382,7 +418,7 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer { * a warning, but not thrown. */ private disconnectFromBroker(): void { - if (!this._brokerInfo) { + if (!this._session) { return; } @@ -394,12 +430,12 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer { .set(MessageHeaders.MessageId, UUID.randomUUID()) .set(MessageHeaders.Timestamp, Date.now()) .set(MessageHeaders.AppSymbolicName, this._appSymbolicName) - .set(MessageHeaders.ClientId, this._brokerInfo.clientId), + .set(MessageHeaders.ClientId, this._session.clientId), }, }; try { - this._brokerInfo.window.postMessage(disconnectMessage, this._brokerInfo.origin); + this._session.broker.window.postMessage(disconnectMessage, this._session.broker.origin); } catch (error) { Beans.get(Logger, {orElseGet: NULL_LOGGER}).warn(`[ClientDisconnectError] Failed to disconnect from the broker. Caused by: ${error}`); @@ -478,15 +514,17 @@ function isPlatformStopped(): boolean { } /** - * Information about the broker. + * Session created after successful connection with the broker. * * @ignore */ -interface BrokerInfo { +interface Session { clientId: string; heartbeatInterval: number; - origin: string; - window: Window; + broker: { + origin: string; + window: Window; + }; } /** @ignore*/ diff --git a/projects/scion/microfrontend-platform/src/lib/client/messaging/intent-client.ts b/projects/scion/microfrontend-platform/src/lib/client/messaging/intent-client.ts index 24a4b4ce..f5dc3525 100644 --- a/projects/scion/microfrontend-platform/src/lib/client/messaging/intent-client.ts +++ b/projects/scion/microfrontend-platform/src/lib/client/messaging/intent-client.ts @@ -21,9 +21,8 @@ import {Qualifier} from '../../platform.model'; * * Like topic-based communication, intent-based communication implements the pub/sub (publish/subscribe) messaging pattern, but is, * in contrast, more restrictive when sending messages. Sending messages is also referred to as issuing intents. It requires the sending - * application to declare an intention in its manifest. Intents can only be issued if there is at least one fulfilling capability - * present in the platform to handle the intent. The platform transports intents exclusively to micro applications that provide a - * fulfilling capability via their manifest. + * application to declare an intention in its manifest. Intents are received only by applications that provide a fulfilling capability. + * If no application provides a fulfilling capability, the platform rejects the intent. * * The communication is built on top of the native `postMessage` mechanism. The host app acts as message broker. * @@ -46,11 +45,11 @@ import {Qualifier} from '../../platform.model'; export abstract class IntentClient { /** - * Issues an intent. + * Sends an intent. * - * A micro application can issue intents for intentions declared in its manifest. The platform transports the intent to micro applications - * that provide a fulfilling capability. Along with the intent, you can pass transfer data, either as payload, message headers or parameters. - * Passed data must be serializable with the Structured Clone Algorithm. + * A micro application can send intents for intentions declared in its manifest. The platform transports the intent to micro applications + * that provide a fulfilling capability. Along with the intent, the application can pass transfer data, either as payload, message headers + * or parameters. Passed data must be serializable with the Structured Clone Algorithm. * * A micro application is implicitly qualified to interact with capabilities that it provides; thus, it must not declare an intention. * @@ -64,11 +63,11 @@ export abstract class IntentClient { public abstract publish(intent: Intent, body?: T, options?: IntentOptions): Promise; /** - * Issues an intent and receives one or more replies. + * Sends an intent and receives one or more replies. * - * A micro application can issue intents for intentions declared in its manifest. The platform transports the intent to micro applications - * that provide a fulfilling capability. Along with the intent, you can pass transfer data, either as payload, message headers or parameters. - * Passed data must be serializable with the Structured Clone Algorithm. + * A micro application can send intents for intentions declared in its manifest. The platform transports the intent to micro applications + * that provide a fulfilling capability. Along with the intent, the application can pass transfer data, either as payload, message headers + * or parameters. Passed data must be serializable with the Structured Clone Algorithm. * * A micro application is implicitly qualified to interact with capabilities that it provides; thus, it must not declare an intention. * @@ -86,9 +85,9 @@ export abstract class IntentClient { /** * Receives an intent when some micro application wants to collaborate with this micro application. * - * Intents are typically handled in an activator. Refer to {@link ActivatorCapability} for more information. + * Intents are typically subscribed to in an activator. Refer to {@link ActivatorCapability} for more information. * - * The micro application receives only intents for which it provides a fulfilling capability through its manifest. + * The micro application receives only intents for which it provides a fulfilling capability. * You can filter received intents by passing a selector. The selector supports the use of wildcards. * * If the received intent has the {@link MessageHeaders.ReplyTo} header field set, the publisher expects the receiver to send one or more @@ -125,7 +124,7 @@ export abstract class IntentClient { * * * - * @return An Observable that emits intents for which this application provides a satisfying capability. It never completes. + * @return An Observable that emits received intents. It never completes. */ public abstract observe$(selector?: IntentSelector): Observable>; diff --git a/projects/scion/microfrontend-platform/src/lib/client/messaging/messaging.script.ts b/projects/scion/microfrontend-platform/src/lib/client/messaging/messaging.script.ts index fbada789..293baa9f 100644 --- a/projects/scion/microfrontend-platform/src/lib/client/messaging/messaging.script.ts +++ b/projects/scion/microfrontend-platform/src/lib/client/messaging/messaging.script.ts @@ -12,7 +12,13 @@ import {MicrofrontendPlatform} from '../../microfrontend-platform'; import {PlatformState} from '../../platform-state'; import {Beans, PreDestroy} from '@scion/toolkit/bean-manager'; import {MessageClient} from './message-client'; -import {fromEvent} from 'rxjs'; +import {IntentClient} from './intent-client'; +import {fromEvent, Observer} from 'rxjs'; +import {filterByChannel, filterByOrigin, filterByTransport, filterByWindow} from '../../operators'; +import {MessagingChannel, MessagingTransport} from '../../ɵmessaging.model'; +import {ɵBrokerGateway} from './broker-gateway'; +import {map} from 'rxjs/operators'; +import {TopicMessage} from '../../messaging.model'; export async function connectToHost({symbolicName}): Promise { // eslint-disable-line @typescript-eslint/typedef await MicrofrontendPlatform.connectToHost(symbolicName); @@ -49,3 +55,67 @@ export async function sendMessageInUnload({symbolicName}): Promise { // es Beans.get(MessageClient).publish(`${symbolicName}/unload`, 'message from client'); }); } + +export async function subscribeToTopic({symbolicName, topic}, observer: Observer): Promise { // eslint-disable-line @typescript-eslint/typedef + await MicrofrontendPlatform.connectToHost(symbolicName); + + Beans.get(MessageClient).observe$(topic).subscribe(); + + const session = Beans.get(ɵBrokerGateway).session; + fromEvent(window, 'message') + .pipe( + filterByWindow(session.broker.window), + filterByOrigin(session.broker.origin), + filterByTransport(MessagingTransport.BrokerToClient), + filterByChannel(MessagingChannel.Topic), + map(envelope => envelope.data.message), + ) + .subscribe(observer); +} + +export async function monitorTopicMessageChannel({symbolicName}, observer: Observer): Promise { // eslint-disable-line @typescript-eslint/typedef + await MicrofrontendPlatform.connectToHost(symbolicName); + + const session = Beans.get(ɵBrokerGateway).session; + fromEvent(window, 'message') + .pipe( + filterByWindow(session.broker.window), + filterByOrigin(session.broker.origin), + filterByTransport(MessagingTransport.BrokerToClient), + filterByChannel(MessagingChannel.Topic), + map(envelope => envelope.data.message), + ) + .subscribe(observer); +} + +export async function subscribeToIntent({symbolicName, intent}, observer: Observer): Promise { // eslint-disable-line @typescript-eslint/typedef + await MicrofrontendPlatform.connectToHost(symbolicName); + + Beans.get(IntentClient).observe$(intent).subscribe(); + + const session = Beans.get(ɵBrokerGateway).session; + fromEvent(window, 'message') + .pipe( + filterByWindow(session.broker.window), + filterByOrigin(session.broker.origin), + filterByTransport(MessagingTransport.BrokerToClient), + filterByChannel(MessagingChannel.Intent), + map(envelope => envelope.data.message), + ) + .subscribe(observer); +} + +export async function monitorIntentMessageChannel({symbolicName}, observer: Observer): Promise { // eslint-disable-line @typescript-eslint/typedef + await MicrofrontendPlatform.connectToHost(symbolicName); + + const session = Beans.get(ɵBrokerGateway).session; + fromEvent(window, 'message') + .pipe( + filterByWindow(session.broker.window), + filterByOrigin(session.broker.origin), + filterByTransport(MessagingTransport.BrokerToClient), + filterByChannel(MessagingChannel.Intent), + map(envelope => envelope.data.message), + ) + .subscribe(observer); +} diff --git a/projects/scion/microfrontend-platform/src/lib/client/messaging/messaging.spec.ts b/projects/scion/microfrontend-platform/src/lib/client/messaging/messaging.spec.ts index 7ab5ff18..46fb7840 100644 --- a/projects/scion/microfrontend-platform/src/lib/client/messaging/messaging.spec.ts +++ b/projects/scion/microfrontend-platform/src/lib/client/messaging/messaging.spec.ts @@ -11,7 +11,7 @@ import {Subject} from 'rxjs'; import {IntentMessage, MessageHeaders, ResponseStatusCodes, TopicMessage} from '../../messaging.model'; import {MessageClient, takeUntilUnsubscribe} from './message-client'; import {IntentClient} from './intent-client'; -import {expectEmissions, expectPromise, getLoggerSpy, installLoggerSpies, readConsoleLog, resetLoggerSpy, waitForCondition, waitUntilSubscriberCount} from '../../testing/spec.util.spec'; +import {expectEmissions, expectPromise, getLoggerSpy, installLoggerSpies, readConsoleLog, resetLoggerSpy, waitForCondition, waitUntilStable, waitUntilSubscriberCount} from '../../testing/spec.util.spec'; import {MicrofrontendPlatform} from '../../microfrontend-platform'; import {ClientRegistry} from '../../host/client-registry/client.registry'; import {Beans} from '@scion/toolkit/bean-manager'; @@ -585,7 +585,7 @@ describe('Messaging', () => { const replyCaptor = new ObserveCaptor(); Beans.get(IntentClient).request$({type: 'some-type'}, 'ping').subscribe(replyCaptor); await replyCaptor.waitUntilCompletedOrErrored(); - expect(replyCaptor.getError()).toEqual('[RequestReplyError] No client is currently running which could answer the intent \'{type=some-type, qualifier=undefined}\'.'); + expect(replyCaptor.getError()).toEqual('[RequestReplyError] No subscriber registered to answer the intent [intent={"type":"some-type"}]'); }); it('should reject a \'request-response\' topic message if no replier is found', async () => { @@ -597,7 +597,7 @@ describe('Messaging', () => { expect(replyCaptor.getValues()).withContext('emissions').toEqual([]); expect(replyCaptor.hasCompleted()).withContext('hasCompleted').toBeFalse(); expect(replyCaptor.hasErrored()).withContext('hasErrored').toBeTrue(); - expect(replyCaptor.getError()).toEqual('[RequestReplyError] No client is currently running which could answer the request sent to the topic \'some-topic\'.'); + expect(replyCaptor.getError()).toEqual('[RequestReplyError] No subscriber registered to answer the request [topic=some-topic]'); }); it('should allow an interceptor to handle a \'request-response\' intent message if no replier is running', async () => { @@ -661,7 +661,7 @@ describe('Messaging', () => { expect(replyCaptor.hasErrored()).withContext('hasErrored').toBeFalse(); }); - it('should reject an intent if no application provides a satisfying capability', async () => { + it('should reject an intent if no application provides a fulfilling capability', async () => { await MicrofrontendPlatform.startHost({ host: { manifest: { @@ -1137,6 +1137,103 @@ describe('Messaging', () => { await expectEmissions(intentCaptor2).toEqual([capabilityId]); }); + it('should transport topic message to subscribed client(s) only', async () => { + await MicrofrontendPlatform.startHost({ + applications: [ + { + symbolicName: 'client', + manifestUrl: new ManifestFixture({name: 'Client'}).serve(), + }, + ], + }); + + // Mount client that monitors the topic message channel and subscribes to the test topic. + const microfrontend1 = registerFixture(new MicrofrontendFixture()); + await microfrontend1.insertIframe().loadScript('./lib/client/messaging/messaging.script.ts', 'subscribeToTopic', {symbolicName: 'client', topic: 'test/topic'}); + const microfrontend1TopicMessageChannel = new ObserveCaptor(); + + // Mount client that monitors the topic message channel but DOES NOT subscribe to the test topic. + const microfrontend2 = registerFixture(new MicrofrontendFixture()); + await microfrontend2.insertIframe().loadScript('./lib/client/messaging/messaging.script.ts', 'monitorTopicMessageChannel', {symbolicName: 'client'}); + const microfrontend2TopicMessageChannel = new ObserveCaptor(); + + // Publish message to the test topic. + await Beans.get(MessageClient).publish('test/topic', 'Topic message'); + + // Capture messages transported to the clients on the topic message channel. + microfrontend1.message$.subscribe(microfrontend1TopicMessageChannel); + await waitUntilStable(() => microfrontend1TopicMessageChannel.getValues()); + microfrontend2.message$.subscribe(microfrontend2TopicMessageChannel); + await waitUntilStable(() => microfrontend2TopicMessageChannel.getValues()); + + // Expect message to be transported to client 1 because subscribed to the topic. + expect(microfrontend1TopicMessageChannel.getValues()).toContain(jasmine.objectContaining({ + topic: 'test/topic', + body: 'Topic message', + })); + + // Expect message NOT to be transported to client 2 because not subscribed to the topic. + expect(microfrontend2TopicMessageChannel.getValues()).not.toContain(jasmine.objectContaining({ + topic: 'test/topic', + body: 'Topic message', + })); + }); + + it('should transport intent message to subscribed client(s) only', async () => { + await MicrofrontendPlatform.startHost({ + host: { + manifest: { + name: 'Host Application', + intentions: [ + {type: 'testee'}, + ], + }, + }, + applications: [ + { + symbolicName: 'client', + manifestUrl: new ManifestFixture({ + name: 'Client', + capabilities: [ + {type: 'testee', private: false}, + ], + }).serve(), + }, + ], + }); + + // Mount client that monitors the intent message channel and subscribes to the test intent. + const microfrontend1 = registerFixture(new MicrofrontendFixture()); + await microfrontend1.insertIframe().loadScript('./lib/client/messaging/messaging.script.ts', 'subscribeToIntent', {symbolicName: 'client', intent: {type: 'testee'}}); + const microfrontend1IntentMessageChannel = new ObserveCaptor(); + + // Mount client that monitors the intent message channel but DOES NOT subscribe to the test intent. + const microfrontend2 = registerFixture(new MicrofrontendFixture()); + await microfrontend2.insertIframe().loadScript('./lib/client/messaging/messaging.script.ts', 'monitorIntentMessageChannel', {symbolicName: 'client'}); + const microfrontend2IntentMessageChannel = new ObserveCaptor(); + + // Publish test intent. + await Beans.get(IntentClient).publish({type: 'testee'}, 'Intent message'); + + // Capture messages transported to the clients on the intent message channel. + microfrontend1.message$.subscribe(microfrontend1IntentMessageChannel); + await waitUntilStable(() => microfrontend1IntentMessageChannel.getValues()); + microfrontend2.message$.subscribe(microfrontend2IntentMessageChannel); + await waitUntilStable(() => microfrontend2IntentMessageChannel.getValues()); + + // Expect message to be transported to client 1 because subscribed to the intent. + expect(microfrontend1IntentMessageChannel.getValues()).toContain(jasmine.objectContaining({ + intent: jasmine.objectContaining({type: 'testee'}), + body: 'Intent message', + })); + + // Expect message NOT to be transported to client 2 because not subscribed to the intent. + expect(microfrontend2IntentMessageChannel.getValues()).not.toContain(jasmine.objectContaining({ + intent: jasmine.objectContaining({type: 'testee'}), + body: 'Intent message', + })); + }); + it('should allow tracking the subscriptions on a topic', async () => { await MicrofrontendPlatform.startHost({applications: []}); @@ -1281,7 +1378,7 @@ describe('Messaging', () => { .set(MessageHeaders.Timestamp, 'should-not-be-set') .set(MessageHeaders.ClientId, 'should-not-be-set') .set(MessageHeaders.AppSymbolicName, 'should-not-be-set') - .set(MessageHeaders.ɵTopicSubscriberId, 'should-not-be-set'), + .set(MessageHeaders.ɵSubscriberId, 'should-not-be-set'), }, ); @@ -1289,7 +1386,7 @@ describe('Messaging', () => { expect(headersCaptor.getLastValue().get(MessageHeaders.Timestamp)).not.toEqual('should-not-be-set'); expect(headersCaptor.getLastValue().get(MessageHeaders.ClientId)).not.toEqual('should-not-be-set'); expect(headersCaptor.getLastValue().get(MessageHeaders.AppSymbolicName)).not.toEqual('should-not-be-set'); - expect(headersCaptor.getLastValue().get(MessageHeaders.ɵTopicSubscriberId)).not.toEqual('should-not-be-set'); + expect(headersCaptor.getLastValue().get(MessageHeaders.ɵSubscriberId)).not.toEqual('should-not-be-set'); }); it('should prevent overriding platform specific message headers [request/reply]', async () => { @@ -1303,7 +1400,7 @@ describe('Messaging', () => { .set(MessageHeaders.Timestamp, 'should-not-be-set') .set(MessageHeaders.ClientId, 'should-not-be-set') .set(MessageHeaders.AppSymbolicName, 'should-not-be-set') - .set(MessageHeaders.ɵTopicSubscriberId, 'should-not-be-set'), + .set(MessageHeaders.ɵSubscriberId, 'should-not-be-set'), }, ).subscribe(); @@ -1311,7 +1408,7 @@ describe('Messaging', () => { expect(headersCaptor.getLastValue().get(MessageHeaders.Timestamp)).not.toEqual('should-not-be-set'); expect(headersCaptor.getLastValue().get(MessageHeaders.ClientId)).not.toEqual('should-not-be-set'); expect(headersCaptor.getLastValue().get(MessageHeaders.AppSymbolicName)).not.toEqual('should-not-be-set'); - expect(headersCaptor.getLastValue().get(MessageHeaders.ɵTopicSubscriberId)).not.toEqual('should-not-be-set'); + expect(headersCaptor.getLastValue().get(MessageHeaders.ɵSubscriberId)).not.toEqual('should-not-be-set'); }); it('should prevent overriding platform specific intent message headers [pub/sub]', async () => { @@ -1332,7 +1429,8 @@ describe('Messaging', () => { headers: new Map() .set(MessageHeaders.Timestamp, 'should-not-be-set') .set(MessageHeaders.ClientId, 'should-not-be-set') - .set(MessageHeaders.AppSymbolicName, 'should-not-be-set'), + .set(MessageHeaders.AppSymbolicName, 'should-not-be-set') + .set(MessageHeaders.ɵSubscriberId, 'should-not-be-set'), }, ); @@ -1360,7 +1458,8 @@ describe('Messaging', () => { headers: new Map() .set(MessageHeaders.Timestamp, 'should-not-be-set') .set(MessageHeaders.ClientId, 'should-not-be-set') - .set(MessageHeaders.AppSymbolicName, 'should-not-be-set'), + .set(MessageHeaders.AppSymbolicName, 'should-not-be-set') + .set(MessageHeaders.ɵSubscriberId, 'should-not-be-set'), }, ).subscribe(); @@ -1659,8 +1758,8 @@ describe('Messaging', () => { await expectEmissions(observeCaptor).toEqual(new Map().set('param2', 'value1')); // assert the deprecation warning - const expectedLogMessage = `[DEPRECATION] Application 'host-app' passes a deprecated parameter in the intent: 'param1'. Pass parameter 'param2' instead.`; - expect(readConsoleLog('warn', {filter: /\[DEPRECATION]/})).toEqual(jasmine.arrayContaining([expectedLogMessage])); + const expectedLogMessage = `[DEPRECATION][4EAC5956] Application 'host-app' passes a deprecated parameter in the intent: 'param1'. Pass parameter 'param2' instead.`; + expect(readConsoleLog('warn', {filter: /\[DEPRECATION]\[4EAC5956]/})).toEqual(jasmine.arrayContaining([expectedLogMessage])); }); it('should make deprecated params optional', async () => { @@ -1732,14 +1831,14 @@ describe('Messaging', () => { // publish without params await Beans.get(IntentClient).publish({type: 'capability'}); await expectEmissions(observeCaptor).toEqual(new Map()); - expect(readConsoleLog('warn', {filter: /\[DEPRECATION]/})).toEqual([]); + expect(readConsoleLog('warn', {filter: /\[DEPRECATION]\[4EAC5956]/})).toEqual([]); // publish with deprecated param 'param1' observeCaptor.reset(); await Beans.get(IntentClient).publish({type: 'capability', params: new Map().set('param1', 'value1')}); await expectEmissions(observeCaptor).toEqual(new Map().set('param1', 'value1')); - expect(readConsoleLog('warn', {filter: /\[DEPRECATION]/})).toEqual([ - `[DEPRECATION] Application 'host-app' passes a deprecated parameter in the intent: 'param1'. DEPRECATION NOTICE`, + expect(readConsoleLog('warn', {filter: /\[DEPRECATION]\[4EAC5956]/})).toEqual([ + `[DEPRECATION][4EAC5956] Application 'host-app' passes a deprecated parameter in the intent: 'param1'. DEPRECATION NOTICE`, ]); // publish with deprecated param 'param2' @@ -1747,8 +1846,8 @@ describe('Messaging', () => { resetLoggerSpy('warn'); await Beans.get(IntentClient).publish({type: 'capability', params: new Map().set('param2', 'value2')}); await expectEmissions(observeCaptor).toEqual(new Map().set('param5', 'value2')); - expect(readConsoleLog('warn', {filter: /\[DEPRECATION]/})).toEqual([ - `[DEPRECATION] Application 'host-app' passes a deprecated parameter in the intent: 'param2'. Pass parameter 'param5' instead. DEPRECATION NOTICE`, + expect(readConsoleLog('warn', {filter: /\[DEPRECATION]\[4EAC5956]/})).toEqual([ + `[DEPRECATION][4EAC5956] Application 'host-app' passes a deprecated parameter in the intent: 'param2'. Pass parameter 'param5' instead. DEPRECATION NOTICE`, ]); // publish with deprecated param 'param3' @@ -1756,8 +1855,8 @@ describe('Messaging', () => { resetLoggerSpy('warn'); await Beans.get(IntentClient).publish({type: 'capability', params: new Map().set('param3', 'value3')}); await expectEmissions(observeCaptor).toEqual(new Map().set('param5', 'value3')); - expect(readConsoleLog('warn', {filter: /\[DEPRECATION]/})).toEqual([ - `[DEPRECATION] Application 'host-app' passes a deprecated parameter in the intent: 'param3'. Pass parameter 'param5' instead.`, + expect(readConsoleLog('warn', {filter: /\[DEPRECATION]\[4EAC5956]/})).toEqual([ + `[DEPRECATION][4EAC5956] Application 'host-app' passes a deprecated parameter in the intent: 'param3'. Pass parameter 'param5' instead.`, ]); // publish with deprecated param 'param4' @@ -1765,8 +1864,8 @@ describe('Messaging', () => { resetLoggerSpy('warn'); await Beans.get(IntentClient).publish({type: 'capability', params: new Map().set('param4', 'value4')}); await expectEmissions(observeCaptor).toEqual(new Map().set('param4', 'value4')); - expect(readConsoleLog('warn', {filter: /\[DEPRECATION]/})).toEqual([ - `[DEPRECATION] Application 'host-app' passes a deprecated parameter in the intent: 'param4'.`, + expect(readConsoleLog('warn', {filter: /\[DEPRECATION]\[4EAC5956]/})).toEqual([ + `[DEPRECATION][4EAC5956] Application 'host-app' passes a deprecated parameter in the intent: 'param4'.`, ]); }); }); diff --git "a/projects/scion/microfrontend-platform/src/lib/client/messaging/\311\265intent-client.ts" "b/projects/scion/microfrontend-platform/src/lib/client/messaging/\311\265intent-client.ts" index 9b88157e..d43bc685 100644 --- "a/projects/scion/microfrontend-platform/src/lib/client/messaging/\311\265intent-client.ts" +++ "b/projects/scion/microfrontend-platform/src/lib/client/messaging/\311\265intent-client.ts" @@ -12,9 +12,7 @@ import {defer, Observable, Subscription} from 'rxjs'; import {Intent, IntentMessage, throwOnErrorStatus, TopicMessage} from '../../messaging.model'; import {BrokerGateway} from './broker-gateway'; import {MessagingChannel} from '../../ɵmessaging.model'; -import {filterByChannel, pluckMessage} from '../../operators'; -import {filter} from 'rxjs/operators'; -import {assertExactQualifier, QualifierMatcher} from '../../qualifier-matcher'; +import {assertExactQualifier} from '../../qualifier-matcher'; import {IntentClient, IntentOptions, IntentSelector} from './intent-client'; import {Beans} from '@scion/toolkit/bean-manager'; import {MessageHandler} from './message-handler'; @@ -53,13 +51,7 @@ export class ɵIntentClient implements IntentClient { } public observe$(selector?: IntentSelector): Observable> { - return this._brokerGateway.message$ - .pipe( - filterByChannel>(MessagingChannel.Intent), - pluckMessage(), - filter(message => !selector || !selector.type || selector.type === message.intent.type), - filter(message => !selector || !selector.qualifier || new QualifierMatcher(selector.qualifier, {evalAsterisk: true, evalOptional: true}).matches(message.intent.qualifier)), - ); + return this._brokerGateway.subscribeToIntent$(selector); } public onIntent(selector: IntentSelector, callback: (intentMessage: IntentMessage) => Observable | Promise | OUT | void): Subscription { diff --git a/projects/scion/microfrontend-platform/src/lib/error.util.ts b/projects/scion/microfrontend-platform/src/lib/error.util.ts index 874ba610..f9fd0148 100644 --- a/projects/scion/microfrontend-platform/src/lib/error.util.ts +++ b/projects/scion/microfrontend-platform/src/lib/error.util.ts @@ -11,9 +11,9 @@ /** * Returns the error message if given an error object, or the `toString` representation otherwise. */ -export function stringifyError(error: any): string { +export function stringifyError(error: string | Error | unknown): string { if (error instanceof Error) { return error.message; } - return error?.toString(); + return `${error}`; } diff --git a/projects/scion/microfrontend-platform/src/lib/host/client-registry/client.registry.ts b/projects/scion/microfrontend-platform/src/lib/host/client-registry/client.registry.ts index 767ae533..0ab9298c 100644 --- a/projects/scion/microfrontend-platform/src/lib/host/client-registry/client.registry.ts +++ b/projects/scion/microfrontend-platform/src/lib/host/client-registry/client.registry.ts @@ -18,7 +18,12 @@ import {Client} from './client'; export abstract class ClientRegistry { /** - * Emits when unregistering a client. + * Emits when registered a client. + */ + public abstract readonly register$: Observable; + + /** + * Emits when unregistered a client. */ public abstract readonly unregister$: Observable; diff --git "a/projects/scion/microfrontend-platform/src/lib/host/client-registry/\311\265client.registry.ts" "b/projects/scion/microfrontend-platform/src/lib/host/client-registry/\311\265client.registry.ts" index bab39482..7f2e4546 100644 --- "a/projects/scion/microfrontend-platform/src/lib/host/client-registry/\311\265client.registry.ts" +++ "b/projects/scion/microfrontend-platform/src/lib/host/client-registry/\311\265client.registry.ts" @@ -17,6 +17,7 @@ export class ɵClientRegistry implements ClientRegistry, PreDestroy { private readonly _clientsById = new Map(); private readonly _clientsByWindow = new Map(); + public readonly register$ = new Subject(); public readonly unregister$ = new Subject(); public registerClient(client: Client): void { @@ -33,6 +34,7 @@ export class ɵClientRegistry implements ClientRegistry, PreDestroy { } this._clientsById.set(client.id, client); this._clientsByWindow.set(client.window, client); + this.register$.next(client); } public unregisterClient(client: Client): void { diff --git a/projects/scion/microfrontend-platform/src/lib/host/manifest-registry/manifest-registry.spec.ts b/projects/scion/microfrontend-platform/src/lib/host/manifest-registry/manifest-registry.spec.ts index c0b988b3..ce6093cb 100644 --- a/projects/scion/microfrontend-platform/src/lib/host/manifest-registry/manifest-registry.spec.ts +++ b/projects/scion/microfrontend-platform/src/lib/host/manifest-registry/manifest-registry.spec.ts @@ -446,8 +446,8 @@ describe('ManifestRegistry', () => { // Assert deprecation warning expect(readConsoleLog('warn')).toEqual(jasmine.arrayContaining([ - `[DEPRECATION] The 'host-app' application uses a deprecated API for declaring required parameters of a capability. The API will be removed in a future release. To migrate, declare parameters by using the 'Capability#params' property, as follows: { params: [{name: 'param3', required: true}] }`, - `[DEPRECATION] The 'host-app' application uses a deprecated API for declaring optional parameters of a capability. The API will be removed in a future release. To migrate, declare parameters by using the 'Capability#params' property, as follows: { params: [{name: 'param4', required: false}] }`, + `[DEPRECATION][AC3A912] The 'host-app' application uses a deprecated API for declaring required parameters of a capability. The API will be removed in a future release. To migrate, declare parameters by using the 'Capability#params' property, as follows: { params: [{name: 'param3', required: true}] }`, + `[DEPRECATION][97C70E9] The 'host-app' application uses a deprecated API for declaring optional parameters of a capability. The API will be removed in a future release. To migrate, declare parameters by using the 'Capability#params' property, as follows: { params: [{name: 'param4', required: false}] }`, ])); // Assert registration @@ -485,8 +485,8 @@ describe('ManifestRegistry', () => { // Assert deprecation warning expect(readConsoleLog('warn')).toEqual(jasmine.arrayContaining([ - `[DEPRECATION] The 'host-app' application uses a deprecated API for declaring required parameters of a capability. The API will be removed in a future release. To migrate, declare parameters by using the 'Capability#params' property, as follows: { params: [{name: 'param3', required: true}] }`, - `[DEPRECATION] The 'host-app' application uses a deprecated API for declaring optional parameters of a capability. The API will be removed in a future release. To migrate, declare parameters by using the 'Capability#params' property, as follows: { params: [{name: 'param4', required: false}] }`, + `[DEPRECATION][AC3A912] The 'host-app' application uses a deprecated API for declaring required parameters of a capability. The API will be removed in a future release. To migrate, declare parameters by using the 'Capability#params' property, as follows: { params: [{name: 'param3', required: true}] }`, + `[DEPRECATION][97C70E9] The 'host-app' application uses a deprecated API for declaring optional parameters of a capability. The API will be removed in a future release. To migrate, declare parameters by using the 'Capability#params' property, as follows: { params: [{name: 'param4', required: false}] }`, ])); // Assert registration diff --git a/projects/scion/microfrontend-platform/src/lib/host/manifest-registry/manifest-registry.ts b/projects/scion/microfrontend-platform/src/lib/host/manifest-registry/manifest-registry.ts index 4019b142..4e630433 100644 --- a/projects/scion/microfrontend-platform/src/lib/host/manifest-registry/manifest-registry.ts +++ b/projects/scion/microfrontend-platform/src/lib/host/manifest-registry/manifest-registry.ts @@ -19,13 +19,13 @@ import {Intent} from '../../messaging.model'; export abstract class ManifestRegistry { /** - * Returns capabilities which are visible to the given application and which satisfy the given intent. + * Returns capabilities which are visible to the given application and match the given intent. * The intent is not allowed to contain wildcards in its qualifier. */ public abstract resolveCapabilitiesByIntent(intent: Intent, appSymbolicName: string): Capability[]; /** - * Tests whether the given app has declared an intention for the given intent, or is providing a capability fulfilling the given intent. + * Tests whether the given app has declared an intention for the given intent, or is providing a capability matching the given intent. */ public abstract hasIntention(intent: Intent, appSymbolicName: string): boolean; diff --git "a/projects/scion/microfrontend-platform/src/lib/host/manifest-registry/\311\265manifest-registry.ts" "b/projects/scion/microfrontend-platform/src/lib/host/manifest-registry/\311\265manifest-registry.ts" index 5a255f52..19b9a19f 100644 --- "a/projects/scion/microfrontend-platform/src/lib/host/manifest-registry/\311\265manifest-registry.ts" +++ "b/projects/scion/microfrontend-platform/src/lib/host/manifest-registry/\311\265manifest-registry.ts" @@ -86,7 +86,7 @@ export class ɵManifestRegistry implements ManifestRegistry, PreDestroy { } /** - * Tests whether the given app has declared a satisfying intention for the given capability. + * Tests whether the given app has declared a matching intention for the given capability. */ private hasIntentionForCapability(appSymbolicName: string, capability: Capability): boolean { const filter: ManifestObjectFilter = {appSymbolicName, type: capability.type, qualifier: capability.qualifier}; @@ -300,12 +300,12 @@ function coerceCapabilityParamDefinitions(capability: Capability, appSymbolicNam capability.requiredParams?.forEach(name => { // eslint-disable-line deprecation/deprecation params.push({name, required: true}); const migration = `{ params: [{name: '${name}', required: true}] }`; - Beans.get(Logger).warn(`[DEPRECATION] The '${appSymbolicName}' application uses a deprecated API for declaring required parameters of a capability. The API will be removed in a future release. To migrate, declare parameters by using the 'Capability#params' property, as follows: ${migration}`, new LoggingContext(appSymbolicName), capability); + Beans.get(Logger).warn(`[DEPRECATION][AC3A912] The '${appSymbolicName}' application uses a deprecated API for declaring required parameters of a capability. The API will be removed in a future release. To migrate, declare parameters by using the 'Capability#params' property, as follows: ${migration}`, new LoggingContext(appSymbolicName), capability); }); capability.optionalParams?.forEach(name => { // eslint-disable-line deprecation/deprecation params.push({name, required: false}); const migration = `{ params: [{name: '${name}', required: false}] }`; - Beans.get(Logger).warn(`[DEPRECATION] The '${appSymbolicName}' application uses a deprecated API for declaring optional parameters of a capability. The API will be removed in a future release. To migrate, declare parameters by using the 'Capability#params' property, as follows: ${migration}`, new LoggingContext(appSymbolicName), capability); + Beans.get(Logger).warn(`[DEPRECATION][97C70E9] The '${appSymbolicName}' application uses a deprecated API for declaring optional parameters of a capability. The API will be removed in a future release. To migrate, declare parameters by using the 'Capability#params' property, as follows: ${migration}`, new LoggingContext(appSymbolicName), capability); }); capability.params?.forEach(param => { params.push(param); diff --git a/projects/scion/microfrontend-platform/src/lib/host/message-broker/intent-param-validator.interceptor.ts b/projects/scion/microfrontend-platform/src/lib/host/message-broker/intent-param-validator.interceptor.ts index 6bef58cf..ddd6dfaa 100644 --- a/projects/scion/microfrontend-platform/src/lib/host/message-broker/intent-param-validator.interceptor.ts +++ b/projects/scion/microfrontend-platform/src/lib/host/message-broker/intent-param-validator.interceptor.ts @@ -43,7 +43,7 @@ export class IntentParamValidator implements IntentInterceptor { if (paramMatcherResult.deprecatedParams.length) { paramMatcherResult.deprecatedParams.forEach(deprecatedParam => { const warning = toDeprecatedParamWarning(deprecatedParam, {appSymbolicName: sender}); - Beans.get(Logger).warn(`[DEPRECATION] ${warning}`, new LoggingContext(sender), intentMessage.intent); + Beans.get(Logger).warn(`[DEPRECATION][4EAC5956] ${warning}`, new LoggingContext(sender), intentMessage.intent); }); // Use the matcher's parameters to have deprecated params mapped to their replacement. intentMessage.intent.params = paramMatcherResult.params!; diff --git a/projects/scion/microfrontend-platform/src/lib/host/message-broker/intent-subscription.registry.ts b/projects/scion/microfrontend-platform/src/lib/host/message-broker/intent-subscription.registry.ts new file mode 100644 index 00000000..49dd0d88 --- /dev/null +++ b/projects/scion/microfrontend-platform/src/lib/host/message-broker/intent-subscription.registry.ts @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2018-2020 Swiss Federal Railways + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + */ + +import {Client} from '../client-registry/client'; +import {MessageSubscription, MessageSubscriptionRegistry} from './message-subscription.registry'; +import {Intent} from '../../messaging.model'; +import {IntentSelector} from '../../client/messaging/intent-client'; +import {QualifierMatcher} from '../../qualifier-matcher'; + +/** + * Central point for managing intent subscriptions. + * + * @ignore + */ +export class IntentSubscriptionRegistry extends MessageSubscriptionRegistry { + + /** + * @inheritDoc + */ + public override subscriptions(filter?: {subscriberId?: string; clientId?: string; appSymbolicName?: string; intent?: Intent}): IntentSubscription[] { + return super.subscriptions(filter).filter(subscription => filter?.intent ? subscription.matches(filter.intent) : true); + } +} + +/** + * Represents a subscription for intents matching the passed selector. + * + * @ignore + */ +export class IntentSubscription implements MessageSubscription { + + constructor(public readonly selector: IntentSelector, + public readonly subscriberId: string, + public readonly client: Client) { + } + + /** + * Tests whether the given intent matches this subscription. + * + * Note that only a type and qualifier check is performed, but not whether the application is eligible + * to receive matching intents, i.e., provides a fulfilling capability. + */ + public matches(intent: Intent): boolean { + if (this.selector?.type && this.selector.type !== intent.type) { + return false; + } + if (this.selector?.qualifier && !new QualifierMatcher(this.selector.qualifier, {evalAsterisk: true, evalOptional: true}).matches(intent.qualifier)) { + return false; + } + return true; + } +} diff --git a/projects/scion/microfrontend-platform/src/lib/host/message-broker/message-broker.ts b/projects/scion/microfrontend-platform/src/lib/host/message-broker/message-broker.ts index d58ef534..cf064a25 100644 --- a/projects/scion/microfrontend-platform/src/lib/host/message-broker/message-broker.ts +++ b/projects/scion/microfrontend-platform/src/lib/host/message-broker/message-broker.ts @@ -7,24 +7,21 @@ * * SPDX-License-Identifier: EPL-2.0 */ -import {EMPTY, fromEvent, MonoTypeOperatorFunction, Observable, of, Subject} from 'rxjs'; +import {EMPTY, fromEvent, MonoTypeOperatorFunction, Observable, of, pipe, Subject, tap} from 'rxjs'; import {catchError, filter, mergeMap, share, takeUntil} from 'rxjs/operators'; import {IntentMessage, Message, MessageHeaders, TopicMessage} from '../../messaging.model'; -import {ConnackMessage, MessageDeliveryStatus, MessageEnvelope, MessagingChannel, MessagingTransport, PlatformTopics, TopicSubscribeCommand, TopicUnsubscribeCommand} from '../../ɵmessaging.model'; +import {ConnackMessage, IntentSubscribeCommand, MessageDeliveryStatus, MessageEnvelope, MessagingChannel, MessagingTransport, PlatformTopics, TopicSubscribeCommand, UnsubscribeCommand} from '../../ɵmessaging.model'; import {ApplicationRegistry} from '../application-registry'; import {ManifestRegistry} from '../manifest-registry/manifest-registry'; -import {Defined} from '@scion/toolkit/util'; import {UUID} from '@scion/toolkit/uuid'; import {Logger, LoggingContext} from '../../logger'; import {runSafe} from '../../safe-runner'; -import {TopicSubscriptionRegistry} from './topic-subscription.registry'; +import {TopicSubscription, TopicSubscriptionRegistry} from './topic-subscription.registry'; import {ClientRegistry} from '../client-registry/client.registry'; -import {RetainedMessageStore} from './retained-message-store'; -import {TopicMatcher} from '../../topic-matcher.util'; import {chainInterceptors, IntentInterceptor, MessageInterceptor, PublishInterceptorChain} from './message-interception'; import {Beans, Initializer, PreDestroy} from '@scion/toolkit/bean-manager'; import {Runlevel} from '../../platform-state'; -import {APP_IDENTITY, Capability} from '../../platform.model'; +import {APP_IDENTITY} from '../../platform.model'; import {bufferUntil} from '@scion/toolkit/operators'; import {filterByChannel, filterByTopicChannel, filterByTransport} from '../../operators'; import {Client} from '../client-registry/client'; @@ -33,6 +30,11 @@ import {VERSION} from '../../version'; import {CLIENT_HEARTBEAT_INTERVAL} from '../client-registry/client.constants'; import {ɵClient} from '../client-registry/ɵclient'; import {stringifyError} from '../../error.util'; +import {IntentSubscription, IntentSubscriptionRegistry} from './intent-subscription.registry'; +import {RetainedMessageStore} from './retained-message-store'; +import {TopicMatcher} from '../../topic-matcher.util'; +import {IntentSubscriptionLegacySupport} from '../../intent-subscription-legacy-support'; +import {Defined} from '@scion/toolkit/util'; /** * The broker is responsible for receiving all messages, filtering the messages, determining who is @@ -58,6 +60,7 @@ export class MessageBroker implements Initializer, PreDestroy { private readonly _clientRegistry = Beans.get(ClientRegistry); private readonly _topicSubscriptionRegistry = Beans.get(TopicSubscriptionRegistry); + private readonly _intentSubscriptionRegistry = Beans.get(IntentSubscriptionRegistry); private readonly _retainedMessageRegistry = new RetainedMessageStore(); private readonly _applicationRegistry: ApplicationRegistry; @@ -76,9 +79,10 @@ export class MessageBroker implements Initializer, PreDestroy { this._clientMessage$ = fromEvent(window, 'message') .pipe( filterByTransport(MessagingTransport.ClientToBroker), - filterByChannel(MessagingChannel.Intent, MessagingChannel.Topic, MessagingChannel.TopicSubscribe, MessagingChannel.TopicUnsubscribe), + filterByChannel(MessagingChannel.Intent, MessagingChannel.Topic, MessagingChannel.TopicSubscribe, MessagingChannel.TopicUnsubscribe, MessagingChannel.IntentSubscribe, MessagingChannel.IntentUnsubscribe), bufferUntil(Beans.whenRunlevel(Runlevel.Two)), checkOriginTrusted(), + sanitizeMessageHeaders(), catchErrorAndRetry(), share(), ); @@ -89,12 +93,16 @@ export class MessageBroker implements Initializer, PreDestroy { // Install message dispatchers. this.installTopicMessageDispatcher(); - this.installIntentMessageDispatcher(); - - // Install topic subscriptions listeners. this.installTopicSubscribeListener(); this.installTopicUnsubscribeListener(); this.installTopicSubscriberCountObserver(); + this.sendRetainedTopicMessageOnSubscribe(); + + // Install intent dispatchers. + this.installIntentMessageDispatcher(); + this.installIntentSubscribeListener(); + this.installIntentUnsubscribeListener(); + IntentSubscriptionLegacySupport.installLegacyClientIntentSubscription(); // Assemble message interceptors to a chain of handlers which are called one after another. The publisher is added as terminal handler. this._messagePublisher = this.createMessagePublisher(); @@ -209,72 +217,6 @@ export class MessageBroker implements Initializer, PreDestroy { })); } - /** - * Listens for topic subscribe commands. - */ - private installTopicSubscribeListener(): void { - this._clientMessage$ - .pipe( - filterByChannel(MessagingChannel.TopicSubscribe), - takeUntil(this._destroy$), - ) - .subscribe((event: MessageEvent>) => runSafe(() => { - const client = getSendingClient(event); - const envelope = event.data; - const topic = envelope.message.topic; - const subscriberId = envelope.message.subscriberId; - const messageId = envelope.message.headers.get(MessageHeaders.MessageId); - - if (!topic) { - sendDeliveryStatusError(client, messageId, '[TopicSubscribeError] Missing required property on message: topic'); - return; - } - if (!subscriberId) { - sendDeliveryStatusError(client, messageId, '[TopicSubscribeError] Missing required property on message: subscriberId'); - return; - } - - this._topicSubscriptionRegistry.subscribe(topic, client, subscriberId); - sendDeliveryStatusSuccess(client, messageId); - - // Dispatch a retained message, if any. - const retainedMessage = this._retainedMessageRegistry.findMostRecentRetainedMessage(topic); - if (retainedMessage) { - const retainedMessageWorkingCopy = { - ...retainedMessage, - headers: new Map(retainedMessage.headers).set(MessageHeaders.ɵTopicSubscriberId, subscriberId), - params: new TopicMatcher(topic).match(retainedMessage.topic).params, - }; - sendTopicMessage(client, retainedMessageWorkingCopy); - } - })); - } - - /** - * Listens for topic unsubscribe commands. - */ - private installTopicUnsubscribeListener(): void { - this._clientMessage$ - .pipe( - filterByChannel(MessagingChannel.TopicUnsubscribe), - takeUntil(this._destroy$), - ) - .subscribe((event: MessageEvent>) => runSafe(() => { - const client = getSendingClient(event); - const envelope = event.data; - const subscriberId = envelope.message.subscriberId; - const messageId = envelope.message.headers.get(MessageHeaders.MessageId); - - if (!subscriberId) { - sendDeliveryStatusError(client, messageId, '[TopicUnsubscribeError] Missing required property on message: subscriberId'); - return; - } - - this._topicSubscriptionRegistry.unsubscribe(subscriberId); - sendDeliveryStatusSuccess(client, messageId); - })); - } - /** * Replies to requests to observe the number of subscribers on a topic. */ @@ -295,7 +237,7 @@ export class MessageBroker implements Initializer, PreDestroy { this._topicSubscriptionRegistry.subscriptionCount$(topic) .pipe(takeUntil(this._topicSubscriptionRegistry.subscriptionCount$(replyTo).pipe(filter(count => count === 0)))) .subscribe((count: number) => runSafe(() => { // eslint-disable-line rxjs/no-nested-subscribe - this.dispatchTopicMessage({ + this._messagePublisher.interceptAndPublish({ topic: replyTo, body: count, headers: new Map() @@ -321,12 +263,18 @@ export class MessageBroker implements Initializer, PreDestroy { const topicMessage = event.data.message; const messageId = topicMessage.headers.get(MessageHeaders.MessageId); + if (!topicMessage.topic) { + const error = '[TopicDispatchError] Missing property: topic'; + sendDeliveryStatusError(client, messageId, error); + return; + } + try { await this._messagePublisher.interceptAndPublish(topicMessage); sendDeliveryStatusSuccess(client, messageId); } catch (error) { - sendDeliveryStatusError(client, messageId, stringifyError(error)); + sendDeliveryStatusError(client, messageId, error); } })); } @@ -376,7 +324,120 @@ export class MessageBroker implements Initializer, PreDestroy { sendDeliveryStatusSuccess(client, messageId); } catch (error) { - sendDeliveryStatusError(client, messageId, stringifyError(error)); + sendDeliveryStatusError(client, messageId, error); + } + })); + } + + private sendRetainedTopicMessageOnSubscribe(): void { + this._topicSubscriptionRegistry.register$ + .pipe(takeUntil(this._destroy$)) + .subscribe(subscription => { + const retainedMessage = this._retainedMessageRegistry.findMostRecentRetainedMessage(subscription.topic); + if (retainedMessage) { + const headers = new Map(retainedMessage.headers).set(MessageHeaders.ɵSubscriberId, subscription.subscriberId); + this._messagePublisher.interceptAndPublish({...retainedMessage, headers}); + } + }); + } + + /** + * Listens for topic subscription requests. + */ + private installTopicSubscribeListener(): void { + this._clientMessage$ + .pipe( + filterByChannel(MessagingChannel.TopicSubscribe), + takeUntil(this._destroy$), + ) + .subscribe((event: MessageEvent>) => runSafe(() => { + const client = getSendingClient(event); + const envelope = event.data; + const messageId = envelope.message.headers.get(MessageHeaders.MessageId); + + try { + const subscriberId = Defined.orElseThrow(envelope.message.subscriberId, () => Error('[TopicSubscribeError] Missing property: subscriberId')); + const topic = Defined.orElseThrow(envelope.message.topic, () => Error('[TopicSubscribeError] Missing property: topic')); + this._topicSubscriptionRegistry.register(new TopicSubscription(topic, subscriberId, client)); + sendDeliveryStatusSuccess(client, messageId); + } + catch (error) { + sendDeliveryStatusError(client, messageId, error); + } + })); + } + + /** + * Listens for topic unsubscription requests. + */ + private installTopicUnsubscribeListener(): void { + this._clientMessage$ + .pipe( + filterByChannel(MessagingChannel.TopicUnsubscribe), + takeUntil(this._destroy$), + ) + .subscribe((event: MessageEvent>) => runSafe(() => { + const client = getSendingClient(event); + const envelope = event.data; + const messageId = envelope.message.headers.get(MessageHeaders.MessageId); + + try { + const subscriberId = Defined.orElseThrow(envelope.message.subscriberId, () => Error('[TopicUnsubscribeError] Missing property: subscriberId')); + this._topicSubscriptionRegistry.unregister({subscriberId}); + sendDeliveryStatusSuccess(client, messageId); + } + catch (error) { + sendDeliveryStatusError(client, messageId, error); + } + })); + } + + /** + * Listens for intent subscription requests. + */ + private installIntentSubscribeListener(): void { + this._clientMessage$ + .pipe( + filterByChannel(MessagingChannel.IntentSubscribe), + takeUntil(this._destroy$), + ) + .subscribe((event: MessageEvent>) => runSafe(() => { + const client = getSendingClient(event); + const envelope = event.data; + const messageId = envelope.message.headers.get(MessageHeaders.MessageId); + + try { + const subscriberId = Defined.orElseThrow(envelope.message.subscriberId, () => Error('[IntentSubscribeError] Missing property: subscriberId')); + this._intentSubscriptionRegistry.register(new IntentSubscription(envelope.message.selector || {}, subscriberId, client)); + sendDeliveryStatusSuccess(client, messageId); + } + catch (error) { + sendDeliveryStatusError(client, messageId, error); + } + })); + } + + /** + * Listens for intent unsubscription requests. + */ + private installIntentUnsubscribeListener(): void { + this._clientMessage$ + .pipe( + filterByChannel(MessagingChannel.IntentUnsubscribe), + takeUntil(this._destroy$), + ) + .subscribe((event: MessageEvent>) => runSafe(() => { + const client = getSendingClient(event); + const envelope = event.data; + const messageId = envelope.message.headers.get(MessageHeaders.MessageId); + + try { + const subscriberId = Defined.orElseThrow(envelope.message.subscriberId, () => Error('[IntentUnsubscribeError] Missing property: subscriberId')); + this._intentSubscriptionRegistry.unregister({subscriberId}); + sendDeliveryStatusSuccess(client, messageId); + } + catch (error) { + sendDeliveryStatusError(client, messageId, error); } })); } @@ -391,13 +452,17 @@ export class MessageBroker implements Initializer, PreDestroy { return; // Deletion events for retained messages are swallowed. } - // Dispatch the message. - const dispatched = this.dispatchTopicMessage(message); + const subscribers = this._topicSubscriptionRegistry.subscriptions({ + subscriberId: message.headers.get(MessageHeaders.ɵSubscriberId), + topic: message.topic, + }); - // If request-reply communication, throw an error if no replier is found to reply to the topic. - if (!dispatched && message.headers.has(MessageHeaders.ReplyTo)) { - throw Error(`[RequestReplyError] No client is currently running which could answer the request sent to the topic '${message.topic}'.`); + // If request-reply communication, reply with an error if no subscriber is registered to answer the request. + if (message.headers.has(MessageHeaders.ReplyTo) && !subscribers.length) { + throw Error(`[RequestReplyError] No subscriber registered to answer the request [topic=${message.topic}]`); } + + subscribers.forEach(subscriber => runSafe(() => sendTopicMessage(subscriber, message))); }); } @@ -406,58 +471,21 @@ export class MessageBroker implements Initializer, PreDestroy { */ private createIntentPublisher(): PublishInterceptorChain { return chainInterceptors(Beans.all(IntentInterceptor), async (message: IntentMessage): Promise => { - const capability = Defined.orElseThrow(message.capability, () => Error(`[IllegalStateError] Missing target capability on intent message: ${JSON.stringify(message)}`)); - const clients = this._clientRegistry.getByApplication(capability.metadata!.appSymbolicName); + const subscribers = this._intentSubscriptionRegistry.subscriptions({ + subscriberId: message.headers.get(MessageHeaders.ɵSubscriberId), + appSymbolicName: message.capability.metadata!.appSymbolicName, + intent: message.intent, + }); - // If request-reply communication, send an error if no replier is running to reply to the intent. - if (message.headers.has(MessageHeaders.ReplyTo) && !this.existsClient(capability)) { - throw Error(`[RequestReplyError] No client is currently running which could answer the intent '{type=${message.intent.type}, qualifier=${JSON.stringify(message.intent.qualifier)}}'.`); + // If request-reply communication, reply with an error if no subscriber is registered to answer the intent. + if (message.headers.has(MessageHeaders.ReplyTo) && !subscribers.length) { + throw Error(`[RequestReplyError] No subscriber registered to answer the intent [intent=${JSON.stringify(message.intent)}]`); } - clients - .filter(client => !client.stale) - .forEach(client => runSafe(() => { - const envelope: MessageEnvelope = { - transport: MessagingTransport.BrokerToClient, - channel: MessagingChannel.Intent, - message: message, - }; - client.window.postMessage(envelope, client.application.messageOrigin); - })); + subscribers.forEach(subscriber => runSafe(() => sendIntentMessage(subscriber, message))); }); } - /** - * Dispatches the given topic message to subscribed clients on the transport {@link MessagingTransport.BrokerToClient}. - * - * @return `true` if dispatched the message to at minimum one subscriber, or `false` if no subscriber is subscribed to the given message topic. - */ - private dispatchTopicMessage(topicMessage: TopicMessage): boolean { - const destinations = this._topicSubscriptionRegistry.resolveTopicDestinations(topicMessage.topic); - if (!destinations.length) { - return false; - } - - destinations.forEach(resolvedTopicDestination => runSafe(() => { - const client: Client = resolvedTopicDestination.subscription.client; - sendTopicMessage(client, { - ...topicMessage, - topic: resolvedTopicDestination.topic, - params: resolvedTopicDestination.params, - headers: new Map(topicMessage.headers).set(MessageHeaders.ɵTopicSubscriberId, resolvedTopicDestination.subscription.subscriberId), - }); - })); - - return true; - } - - /** - * Tests if at least one client is running that can handle the specified capability. - */ - private existsClient(capability: Capability): boolean { - return this._clientRegistry.getByApplication(capability.metadata!.appSymbolicName).length > 0; - } - public preDestroy(): void { this._destroy$.next(); } @@ -538,41 +566,60 @@ function sendDeliveryStatusSuccess(target: MessageTarget | Client, topic: string } /** @ignore */ -function sendDeliveryStatusError(target: MessageTarget | Client, topic: string, error: string): void { +function sendDeliveryStatusError(target: MessageTarget | Client, topic: string, error: string | Error | unknown): void { sendTopicMessage(target, { topic: topic, - body: {ok: false, details: error}, + body: {ok: false, details: stringifyError(error)}, headers: new Map(), }); } /** @ignore */ -function sendTopicMessage(target: MessageTarget | Client, message: TopicMessage): void { +function sendTopicMessage(target: MessageTarget | Client | TopicSubscription, message: TopicMessage): void { const envelope: MessageEnvelope> = { transport: MessagingTransport.BrokerToClient, channel: MessagingChannel.Topic, - message: {...message}, + message: { + ...message, + params: new Map(message.params || new Map()), + headers: new Map(message.headers || new Map()) + .set(MessageHeaders.MessageId, message.headers.get(MessageHeaders.MessageId) ?? UUID.randomUUID()) + .set(MessageHeaders.AppSymbolicName, message.headers.get(MessageHeaders.AppSymbolicName) ?? Beans.get(APP_IDENTITY)), + }, }; - envelope.message.params = new Map(envelope.message.params || new Map()); - envelope.message.headers = new Map(envelope.message.headers || new Map()); - - const headers = envelope.message.headers; - if (!headers.has(MessageHeaders.MessageId)) { - headers.set(MessageHeaders.MessageId, UUID.randomUUID()); - } - if (!headers.has(MessageHeaders.AppSymbolicName)) { - headers.set(MessageHeaders.AppSymbolicName, Beans.get(APP_IDENTITY)); - } - if (target instanceof MessageTarget) { !target.window.closed && target.window.postMessage(envelope, target.origin); } + else if (target instanceof TopicSubscription) { + const subscription = target; + const client = subscription.client; + envelope.message.headers.set(IntentSubscriptionLegacySupport.subscriberIdMessageHeaderName(client.version), target.subscriberId); + envelope.message.params = new TopicMatcher(subscription.topic).match(message.topic).params; + !client.stale && client.window.postMessage(envelope, client.application.messageOrigin); + } else { !target.stale && target.window.postMessage(envelope, target.application.messageOrigin); } } +/** @ignore */ +function sendIntentMessage(subscription: IntentSubscription, message: IntentMessage): void { + const envelope: MessageEnvelope = { + transport: MessagingTransport.BrokerToClient, + channel: MessagingChannel.Intent, + message: { + ...message, + headers: new Map(message.headers || new Map()) + .set(MessageHeaders.ɵSubscriberId, subscription.subscriberId) + .set(MessageHeaders.MessageId, message.headers.get(MessageHeaders.MessageId) ?? UUID.randomUUID()) + .set(MessageHeaders.AppSymbolicName, message.headers.get(MessageHeaders.AppSymbolicName) ?? Beans.get(APP_IDENTITY)), + }, + }; + const client = subscription.client; + !client.stale && client.window.postMessage(envelope, client.application.messageOrigin); +} + /** * Catches and logs errors, and resubscribes to the source observable. * @@ -585,6 +632,18 @@ function catchErrorAndRetry(): MonoTypeOperatorFunction { }); } +/** + * Sanitizes message headers that should not be set by clients. + */ +function sanitizeMessageHeaders(): MonoTypeOperatorFunction>> { + return pipe( + /** + * The subscriber identifier is set exclusively by the host when it dispatches a message to a subscribed client. + */ + tap(event => event.data.message.headers.delete(MessageHeaders.ɵSubscriberId)), + ); +} + /** * Represents the target where to send a message. * diff --git a/projects/scion/microfrontend-platform/src/lib/host/message-broker/message-subscription.registry.ts b/projects/scion/microfrontend-platform/src/lib/host/message-broker/message-subscription.registry.ts new file mode 100644 index 00000000..89e57e2e --- /dev/null +++ b/projects/scion/microfrontend-platform/src/lib/host/message-broker/message-subscription.registry.ts @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2018-2022 Swiss Federal Railways + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + */ + +import {Client} from '../client-registry/client'; +import {Beans, PreDestroy} from '@scion/toolkit/bean-manager'; +import {ClientRegistry} from '../client-registry/client.registry'; +import {takeUntil} from 'rxjs/operators'; +import {runSafe} from '../../safe-runner'; +import {Observable, Subject} from 'rxjs'; +import {Arrays, Maps} from '@scion/toolkit/util'; + +/** + * Central point for managing message subscriptions. + * + * @ignore + */ +export class MessageSubscriptionRegistry implements PreDestroy { + + private readonly _destroy$ = new Subject(); + private readonly _subscriptions = new Map(); + private readonly _subscriptionsByApp = new Map(); + private readonly _subscriptionsByClient = new Map(); + + private readonly _register$ = new Subject(); + private readonly _unregister$ = new Subject(); + + constructor() { + Beans.get(ClientRegistry).unregister$ + .pipe(takeUntil(this._destroy$)) + .subscribe((client: Client) => runSafe(() => { + this.unregister({clientId: client.id}); + })); + } + + /** + * Registers given subscription. + */ + public register(subscription: T): void { + this._subscriptions.set(subscription.subscriberId, subscription); + Maps.addListValue(this._subscriptionsByApp, subscription.client.application.symbolicName, subscription); + Maps.addListValue(this._subscriptionsByClient, subscription.client.id, subscription); + this._register$.next(subscription); + } + + /** + * Unregisters matching subscriptions. + * + * @param filter - Control which subscriptions to remove by specifying filter criteria which are "AND"ed together. + */ + public unregister(filter: {subscriberId?: string; clientId?: string}): void { + this.subscriptions(filter).forEach(subscription => { + this._subscriptions.delete(subscription.subscriberId); + Maps.removeListValue(this._subscriptionsByApp, subscription.client.application.symbolicName, subscription); + Maps.removeListValue(this._subscriptionsByClient, subscription.client.id, subscription); + }); + this._unregister$.next(); + } + + /** + * Returns subscriptions matching the passed filter. + * + * @param filter - Control which subscriptions to return by specifying filter criteria which are "AND"ed together. + * If not specified, returns all subscriptions. + */ + public subscriptions(filter?: {subscriberId?: string; clientId?: string; appSymbolicName?: string}): T[] { + const filterById = filter?.subscriberId; + const filterByClient = filter?.clientId; + const filterByApp = filter?.appSymbolicName; + + return Arrays.intersect( + filterById ? Arrays.coerce(this._subscriptions.get(filterById)) : undefined, + filterByClient ? Arrays.coerce(this._subscriptionsByClient.get(filterByClient)) : undefined, + filterByApp ? Arrays.coerce(this._subscriptionsByApp.get(filterByApp)) : undefined, + (filterById || filterByApp || filterByClient) ? undefined : Array.from(this._subscriptions.values()), + ); + } + + /** + * Emits when registered a subscription via {@link MessageSubscriptionRegistry#register}. + */ + public get register$(): Observable { + return this._register$; + } + + /** + * Emits when unregistered a subscription via {@link MessageSubscriptionRegistry#unregister}. + */ + public get unregister$(): Observable { + return this._unregister$; + } + + public preDestroy(): void { + this._destroy$.next(); + } +} + +/** + * Represents a subscription for a given subscriber. + * + * @ignore + */ +export interface MessageSubscription { + readonly subscriberId: string; + readonly client: Client; +} diff --git a/projects/scion/microfrontend-platform/src/lib/host/message-broker/topic-subscription.registry.spec.ts b/projects/scion/microfrontend-platform/src/lib/host/message-broker/topic-subscription.registry.spec.ts index a9108104..bf97c2bc 100644 --- a/projects/scion/microfrontend-platform/src/lib/host/message-broker/topic-subscription.registry.spec.ts +++ b/projects/scion/microfrontend-platform/src/lib/host/message-broker/topic-subscription.registry.spec.ts @@ -7,7 +7,7 @@ * * SPDX-License-Identifier: EPL-2.0 */ -import {TopicSubscriptionRegistry} from './topic-subscription.registry'; +import {TopicSubscription, TopicSubscriptionRegistry} from './topic-subscription.registry'; import {ClientRegistry} from '../client-registry/client.registry'; import {expectEmissions} from '../../testing/spec.util.spec'; import {ObserveCaptor} from '@scion/toolkit/testing'; @@ -36,22 +36,22 @@ describe('TopicSubscriptionRegistry', () => { await expectSubscriptionCount('myhome/livingroom/temperature').toBe(0); - subscriptionRegistry.subscribe('myhome/livingroom/temperature', client1, 'subscriber#1'); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/temperature', 'subscriber#1', client1)); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(1); - subscriptionRegistry.subscribe('myhome/livingroom/temperature', client2, 'subscriber#2'); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/temperature', 'subscriber#2', client2)); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(2); - subscriptionRegistry.subscribe('myhome/livingroom/temperature', client3, 'subscriber#3'); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/temperature', 'subscriber#3', client3)); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(3); - subscriptionRegistry.unsubscribe('subscriber#1'); + subscriptionRegistry.unregister({subscriberId: 'subscriber#1'}); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(2); - subscriptionRegistry.unsubscribe('subscriber#2'); + subscriptionRegistry.unregister({subscriberId: 'subscriber#2'}); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(1); - subscriptionRegistry.unsubscribe('subscriber#3'); + subscriptionRegistry.unregister({subscriberId: 'subscriber#3'}); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(0); await expectEmissions(subscriptionCountCaptor).toEqual([0, 1, 2, 3, 2, 1, 0]); @@ -67,22 +67,22 @@ describe('TopicSubscriptionRegistry', () => { await expectSubscriptionCount('myhome/livingroom/temperature').toBe(0); - subscriptionRegistry.subscribe('myhome/livingroom/temperature', client, 'subscriber#1'); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/temperature', 'subscriber#1', client)); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(1); - subscriptionRegistry.subscribe('myhome/livingroom/temperature', client, 'subscriber#2'); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/temperature', 'subscriber#2', client)); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(2); - subscriptionRegistry.subscribe('myhome/livingroom/temperature', client, 'subscriber#3'); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/temperature', 'subscriber#3', client)); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(3); - subscriptionRegistry.unsubscribe('subscriber#1'); + subscriptionRegistry.unregister({subscriberId: 'subscriber#1'}); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(2); - subscriptionRegistry.unsubscribe('subscriber#2'); + subscriptionRegistry.unregister({subscriberId: 'subscriber#2'}); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(1); - subscriptionRegistry.unsubscribe('subscriber#3'); + subscriptionRegistry.unregister({subscriberId: 'subscriber#3'}); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(0); await expectEmissions(subscriptionCountCaptor).toEqual([0, 1, 2, 3, 2, 1, 0]); @@ -92,7 +92,7 @@ describe('TopicSubscriptionRegistry', () => { await MicrofrontendPlatform.startHost({applications: []}); const subscriptionRegistry = Beans.get(TopicSubscriptionRegistry); - subscriptionRegistry.unsubscribe('does-not-exist'); + subscriptionRegistry.unregister({subscriberId: 'does-not-exist'}); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(0); }); @@ -118,27 +118,27 @@ describe('TopicSubscriptionRegistry', () => { await expectSubscriptionCount('myhome/livingroom/temperature').toBe(0); await expectSubscriptionCount('myhome/livingroom/humidity').toBe(0); - subscriptionRegistry.subscribe('myhome/livingroom/temperature', client, 'subscriber#1'); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/temperature', 'subscriber#1', client)); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(1); await expectSubscriptionCount('myhome/livingroom/humidity').toBe(0); - subscriptionRegistry.subscribe('myhome/livingroom/:measurement', client, 'subscriber#2'); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/:measurement', 'subscriber#2', client)); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(2); await expectSubscriptionCount('myhome/livingroom/humidity').toBe(1); - subscriptionRegistry.subscribe('myhome/livingroom/humidity', client, 'subscriber#3'); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/humidity', 'subscriber#3', client)); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(2); await expectSubscriptionCount('myhome/livingroom/humidity').toBe(2); - subscriptionRegistry.unsubscribe('subscriber#2'); + subscriptionRegistry.unregister({subscriberId: 'subscriber#2'}); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(1); await expectSubscriptionCount('myhome/livingroom/humidity').toBe(1); - subscriptionRegistry.unsubscribe('subscriber#1'); + subscriptionRegistry.unregister({subscriberId: 'subscriber#1'}); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(0); await expectSubscriptionCount('myhome/livingroom/humidity').toBe(1); - subscriptionRegistry.unsubscribe('subscriber#3'); + subscriptionRegistry.unregister({subscriberId: 'subscriber#3'}); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(0); await expectSubscriptionCount('myhome/livingroom/humidity').toBe(0); @@ -153,7 +153,7 @@ describe('TopicSubscriptionRegistry', () => { const client1 = newClient('client#1'); const client2 = newClient('client#2'); - subscriptionRegistry.subscribe('myhome/:room/temperature', client1, 'subscriber#1'); + subscriptionRegistry.register(new TopicSubscription('myhome/:room/temperature', 'subscriber#1', client1)); await expectSubscriptionCount('myhome/livingroom').toBe(0); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(1); await expectSubscriptionCount('myhome/livingroom/temperature/celcius').toBe(0); @@ -161,7 +161,7 @@ describe('TopicSubscriptionRegistry', () => { await expectSubscriptionCount('myhome/kitchen/temperature').toBe(1); await expectSubscriptionCount('myhome/kitchen/temperature/celcius').toBe(0); - subscriptionRegistry.subscribe('myhome/:room/temperature', client1, 'subscriber#2'); + subscriptionRegistry.register(new TopicSubscription('myhome/:room/temperature', 'subscriber#2', client1)); await expectSubscriptionCount('myhome/livingroom').toBe(0); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(2); await expectSubscriptionCount('myhome/livingroom/temperature/celcius').toBe(0); @@ -169,7 +169,7 @@ describe('TopicSubscriptionRegistry', () => { await expectSubscriptionCount('myhome/kitchen/temperature').toBe(2); await expectSubscriptionCount('myhome/kitchen/temperature/celcius').toBe(0); - subscriptionRegistry.subscribe('myhome/:room/temperature', client2, 'subscriber#3'); + subscriptionRegistry.register(new TopicSubscription('myhome/:room/temperature', 'subscriber#3', client2)); await expectSubscriptionCount('myhome/livingroom').toBe(0); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(3); await expectSubscriptionCount('myhome/livingroom/temperature/celcius').toBe(0); @@ -177,7 +177,7 @@ describe('TopicSubscriptionRegistry', () => { await expectSubscriptionCount('myhome/kitchen/temperature').toBe(3); await expectSubscriptionCount('myhome/kitchen/temperature/celcius').toBe(0); - subscriptionRegistry.subscribe('myhome/:room/temperature', client2, 'subscriber#4'); + subscriptionRegistry.register(new TopicSubscription('myhome/:room/temperature', 'subscriber#4', client2)); await expectSubscriptionCount('myhome/livingroom').toBe(0); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(4); await expectSubscriptionCount('myhome/livingroom/temperature/celcius').toBe(0); @@ -185,7 +185,7 @@ describe('TopicSubscriptionRegistry', () => { await expectSubscriptionCount('myhome/kitchen/temperature').toBe(4); await expectSubscriptionCount('myhome/kitchen/temperature/celcius').toBe(0); - subscriptionRegistry.subscribe('myhome/:room/:measurement', client1, 'subscriber#5'); + subscriptionRegistry.register(new TopicSubscription('myhome/:room/:measurement', 'subscriber#5', client1)); await expectSubscriptionCount('myhome/livingroom').toBe(0); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(5); await expectSubscriptionCount('myhome/livingroom/temperature/celcius').toBe(0); @@ -193,7 +193,7 @@ describe('TopicSubscriptionRegistry', () => { await expectSubscriptionCount('myhome/kitchen/temperature').toBe(5); await expectSubscriptionCount('myhome/kitchen/temperature/celcius').toBe(0); - subscriptionRegistry.subscribe('myhome/:room/:measurement', client2, 'subscriber#6'); + subscriptionRegistry.register(new TopicSubscription('myhome/:room/:measurement', 'subscriber#6', client2)); await expectSubscriptionCount('myhome/livingroom').toBe(0); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(6); await expectSubscriptionCount('myhome/livingroom/temperature/celcius').toBe(0); @@ -201,7 +201,7 @@ describe('TopicSubscriptionRegistry', () => { await expectSubscriptionCount('myhome/kitchen/temperature').toBe(6); await expectSubscriptionCount('myhome/kitchen/temperature/celcius').toBe(0); - subscriptionRegistry.subscribe('myhome/:room/:measurement/:unit', client1, 'subscriber#7'); + subscriptionRegistry.register(new TopicSubscription('myhome/:room/:measurement/:unit', 'subscriber#7', client1)); await expectSubscriptionCount('myhome/livingroom').toBe(0); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(6); await expectSubscriptionCount('myhome/livingroom/temperature/celcius').toBe(1); @@ -209,7 +209,7 @@ describe('TopicSubscriptionRegistry', () => { await expectSubscriptionCount('myhome/kitchen/temperature').toBe(6); await expectSubscriptionCount('myhome/kitchen/temperature/celcius').toBe(1); - subscriptionRegistry.subscribe('myhome/:room/:measurement/:unit', client2, 'subscriber#8'); + subscriptionRegistry.register(new TopicSubscription('myhome/:room/:measurement/:unit', 'subscriber#8', client2)); await expectSubscriptionCount('myhome/livingroom').toBe(0); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(6); await expectSubscriptionCount('myhome/livingroom/temperature/celcius').toBe(2); @@ -232,15 +232,15 @@ describe('TopicSubscriptionRegistry', () => { const subscriptionCountCaptor = new ObserveCaptor(); subscriptionRegistry.subscriptionCount$('myhome/livingroom/temperature').subscribe(subscriptionCountCaptor); - subscriptionRegistry.subscribe('myhome/livingroom/temperature', client1, 'subscriber#1'); - subscriptionRegistry.subscribe('myhome/livingroom/:measurement', client1, 'subscriber#2'); - subscriptionRegistry.subscribe('myhome/:livingroom/:measurement', client1, 'subscriber#3'); - subscriptionRegistry.subscribe(':building/:livingroom/:measurement', client1, 'subscriber#4'); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/temperature', 'subscriber#1', client1)); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/:measurement', 'subscriber#2', client1)); + subscriptionRegistry.register(new TopicSubscription('myhome/:livingroom/:measurement', 'subscriber#3', client1)); + subscriptionRegistry.register(new TopicSubscription(':building/:livingroom/:measurement', 'subscriber#4', client1)); - subscriptionRegistry.subscribe('myhome/livingroom/temperature', client2, 'subscriber#5'); - subscriptionRegistry.subscribe('myhome/livingroom/:measurement', client2, 'subscriber#6'); - subscriptionRegistry.subscribe('myhome/:livingroom/:measurement', client2, 'subscriber#7'); - subscriptionRegistry.subscribe(':building/:livingroom/:measurement', client2, 'subscriber#8'); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/temperature', 'subscriber#5', client2)); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/:measurement', 'subscriber#6', client2)); + subscriptionRegistry.register(new TopicSubscription('myhome/:livingroom/:measurement', 'subscriber#7', client2)); + subscriptionRegistry.register(new TopicSubscription(':building/:livingroom/:measurement', 'subscriber#8', client2)); await expectSubscriptionCount('myhome/livingroom/temperature').toBe(8); @@ -260,24 +260,23 @@ describe('TopicSubscriptionRegistry', () => { const client1 = newClient('client#1'); const client2 = newClient('client#2'); - subscriptionRegistry.subscribe('myhome/livingroom/temperature', client1, 'client#1;sub#1'); - subscriptionRegistry.subscribe('myhome/livingroom/:measurement', client1, 'client#1;sub#2'); - subscriptionRegistry.subscribe('myhome/kitchen/:measurement', client1, 'client#1;sub#3'); - subscriptionRegistry.subscribe('myhome/:room/temperature', client1, 'client#1;sub#4'); - subscriptionRegistry.subscribe('myhome/:room/:measurement', client1, 'client#1;sub#5'); - subscriptionRegistry.subscribe(':building/kitchen/:measurement', client1, 'client#1;sub#6'); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/temperature', 'client#1;sub#1', client1)); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/:measurement', 'client#1;sub#2', client1)); + subscriptionRegistry.register(new TopicSubscription('myhome/kitchen/:measurement', 'client#1;sub#3', client1)); + subscriptionRegistry.register(new TopicSubscription('myhome/:room/temperature', 'client#1;sub#4', client1)); + subscriptionRegistry.register(new TopicSubscription('myhome/:room/:measurement', 'client#1;sub#5', client1)); + subscriptionRegistry.register(new TopicSubscription(':building/kitchen/:measurement', 'client#1;sub#6', client1)); - subscriptionRegistry.subscribe('myhome/livingroom/temperature', client2, 'client#2;sub#1'); - subscriptionRegistry.subscribe('myhome/livingroom/:measurement', client2, 'client#2;sub#2'); - subscriptionRegistry.subscribe('myhome/kitchen/:measurement', client2, 'client#2;sub#3'); - subscriptionRegistry.subscribe('myhome/:room/temperature', client2, 'client#2;sub#4'); - subscriptionRegistry.subscribe('myhome/:room/:measurement', client2, 'client#2;sub#5'); - subscriptionRegistry.subscribe(':building/kitchen/:measurement', client2, 'client#2;sub#6'); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/temperature', 'client#2;sub#1', client2)); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/:measurement', 'client#2;sub#2', client2)); + subscriptionRegistry.register(new TopicSubscription('myhome/kitchen/:measurement', 'client#2;sub#3', client2)); + subscriptionRegistry.register(new TopicSubscription('myhome/:room/temperature', 'client#2;sub#4', client2)); + subscriptionRegistry.register(new TopicSubscription('myhome/:room/:measurement', 'client#2;sub#5', client2)); + subscriptionRegistry.register(new TopicSubscription(':building/kitchen/:measurement', 'client#2;sub#6', client2)); // Resolve the subscribers which observe the topic 'myhome/livingroom/temperature'. - const destinations = subscriptionRegistry.resolveTopicDestinations('myhome/livingroom/temperature'); - - expect(destinations.map(destination => destination.subscription.subscriberId)).toEqual([ + const subscribers = subscriptionRegistry.subscriptions({topic: 'myhome/livingroom/temperature'}); + expect(subscribers.map(subscription => subscription.subscriberId)).toEqual([ 'client#1;sub#1', 'client#1;sub#2', 'client#1;sub#4', @@ -288,78 +287,46 @@ describe('TopicSubscriptionRegistry', () => { 'client#2;sub#5', ]); - expect(destinations[0]).withContext('(a)').toEqual({ - topic: 'myhome/livingroom/temperature', - params: new Map(), - subscription: { - subscriberId: 'client#1;sub#1', - topic: 'myhome/livingroom/temperature', - client: client1, - }, - }); - expect(destinations[1]).withContext('(b)').toEqual({ - topic: 'myhome/livingroom/temperature', - params: new Map().set('measurement', 'temperature'), - subscription: { - subscriberId: 'client#1;sub#2', - topic: 'myhome/livingroom/:measurement', - client: client1, - }, - }); - expect(destinations[2]).withContext('(c)').toEqual({ - topic: 'myhome/livingroom/temperature', - params: new Map().set('room', 'livingroom'), - subscription: { - subscriberId: 'client#1;sub#4', - topic: 'myhome/:room/temperature', - client: client1, - }, - }); - expect(destinations[3]).withContext('(d)').toEqual({ - topic: 'myhome/livingroom/temperature', - params: new Map().set('room', 'livingroom').set('measurement', 'temperature'), - subscription: { - subscriberId: 'client#1;sub#5', - topic: 'myhome/:room/:measurement', - client: client1, - }, - }); - expect(destinations[4]).withContext('(e)').toEqual({ - topic: 'myhome/livingroom/temperature', - params: new Map(), - subscription: { - subscriberId: 'client#2;sub#1', - topic: 'myhome/livingroom/temperature', - client: client2, - }, - }); - expect(destinations[5]).withContext('(f)').toEqual({ - topic: 'myhome/livingroom/temperature', - params: new Map().set('measurement', 'temperature'), - subscription: { - subscriberId: 'client#2;sub#2', - topic: 'myhome/livingroom/:measurement', - client: client2, - }, - }); - expect(destinations[6]).withContext('(g)').toEqual({ + expect(subscribers[0]).withContext('(a)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#1;sub#1', topic: 'myhome/livingroom/temperature', - params: new Map().set('room', 'livingroom'), - subscription: { - subscriberId: 'client#2;sub#4', - topic: 'myhome/:room/temperature', - client: client2, - }, - }); - expect(destinations[7]).withContext('(h)').toEqual({ + client: client1, + })); + expect(subscribers[1]).withContext('(b)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#1;sub#2', + topic: 'myhome/livingroom/:measurement', + client: client1, + })); + expect(subscribers[2]).withContext('(c)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#1;sub#4', + topic: 'myhome/:room/temperature', + client: client1, + })); + expect(subscribers[3]).withContext('(d)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#1;sub#5', + topic: 'myhome/:room/:measurement', + client: client1, + })); + expect(subscribers[4]).withContext('(e)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#2;sub#1', topic: 'myhome/livingroom/temperature', - params: new Map().set('room', 'livingroom').set('measurement', 'temperature'), - subscription: { - subscriberId: 'client#2;sub#5', - topic: 'myhome/:room/:measurement', - client: client2, - }, - }); + client: client2, + })); + expect(subscribers[5]).withContext('(f)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#2;sub#2', + topic: 'myhome/livingroom/:measurement', + client: client2, + })); + expect(subscribers[6]).withContext('(g)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#2;sub#4', + topic: 'myhome/:room/temperature', + client: client2, + })); + expect(subscribers[7]).withContext('(h)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#2;sub#5', + topic: 'myhome/:room/:measurement', + client: client2, + })); }); it('should resolve subscribers which observe the topic \'myhome/kitchen/temperature\'', async () => { @@ -369,26 +336,25 @@ describe('TopicSubscriptionRegistry', () => { const client1 = newClient('client#1'); const client2 = newClient('client#2'); - subscriptionRegistry.subscribe('myhome/livingroom/temperature', client1, 'client#1;sub#1'); - subscriptionRegistry.subscribe('myhome/livingroom/:measurement', client1, 'client#1;sub#2'); - subscriptionRegistry.subscribe('myhome/kitchen/:measurement', client1, 'client#1;sub#3'); - subscriptionRegistry.subscribe('myhome/:room/temperature', client1, 'client#1;sub#4'); - subscriptionRegistry.subscribe('myhome/:room/:measurement', client1, 'client#1;sub#5'); - subscriptionRegistry.subscribe(':building/kitchen/:measurement', client1, 'client#1;sub#6'); - subscriptionRegistry.subscribe(':building/:room/:measurement', client1, 'client#1;sub#7'); - - subscriptionRegistry.subscribe('myhome/livingroom/temperature', client2, 'client#2;sub#1'); - subscriptionRegistry.subscribe('myhome/livingroom/:measurement', client2, 'client#2;sub#2'); - subscriptionRegistry.subscribe('myhome/kitchen/:measurement', client2, 'client#2;sub#3'); - subscriptionRegistry.subscribe('myhome/:room/temperature', client2, 'client#2;sub#4'); - subscriptionRegistry.subscribe('myhome/:room/:measurement', client2, 'client#2;sub#5'); - subscriptionRegistry.subscribe(':building/kitchen/:measurement', client2, 'client#2;sub#6'); - subscriptionRegistry.subscribe(':building/:room/:measurement', client2, 'client#2;sub#7'); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/temperature', 'client#1;sub#1', client1)); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/:measurement', 'client#1;sub#2', client1)); + subscriptionRegistry.register(new TopicSubscription('myhome/kitchen/:measurement', 'client#1;sub#3', client1)); + subscriptionRegistry.register(new TopicSubscription('myhome/:room/temperature', 'client#1;sub#4', client1)); + subscriptionRegistry.register(new TopicSubscription('myhome/:room/:measurement', 'client#1;sub#5', client1)); + subscriptionRegistry.register(new TopicSubscription(':building/kitchen/:measurement', 'client#1;sub#6', client1)); + subscriptionRegistry.register(new TopicSubscription(':building/:room/:measurement', 'client#1;sub#7', client1)); + + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/temperature', 'client#2;sub#1', client2)); + subscriptionRegistry.register(new TopicSubscription('myhome/livingroom/:measurement', 'client#2;sub#2', client2)); + subscriptionRegistry.register(new TopicSubscription('myhome/kitchen/:measurement', 'client#2;sub#3', client2)); + subscriptionRegistry.register(new TopicSubscription('myhome/:room/temperature', 'client#2;sub#4', client2)); + subscriptionRegistry.register(new TopicSubscription('myhome/:room/:measurement', 'client#2;sub#5', client2)); + subscriptionRegistry.register(new TopicSubscription(':building/kitchen/:measurement', 'client#2;sub#6', client2)); + subscriptionRegistry.register(new TopicSubscription(':building/:room/:measurement', 'client#2;sub#7', client2)); // Resolve the subscribers which observe the topic 'myhome/kitchen/temperature'. - const destinations = subscriptionRegistry.resolveTopicDestinations('myhome/kitchen/temperature'); - - expect(destinations.map(destination => destination.subscription.subscriberId)).toEqual([ + const subscribers = subscriptionRegistry.subscriptions({topic: 'myhome/kitchen/temperature'}); + expect(subscribers.map(subscription => subscription.subscriberId)).toEqual([ 'client#1;sub#3', 'client#1;sub#4', 'client#1;sub#5', @@ -401,96 +367,56 @@ describe('TopicSubscriptionRegistry', () => { 'client#2;sub#7', ]); - expect(destinations[0]).withContext('(client 1)(a)').toEqual({ - topic: 'myhome/kitchen/temperature', - params: new Map().set('measurement', 'temperature'), - subscription: { - subscriberId: 'client#1;sub#3', - topic: 'myhome/kitchen/:measurement', - client: client1, - }, - }); - expect(destinations[1]).withContext('(client 1)(b)').toEqual({ - topic: 'myhome/kitchen/temperature', - params: new Map().set('room', 'kitchen'), - subscription: { - subscriberId: 'client#1;sub#4', - topic: 'myhome/:room/temperature', - client: client1, - }, - }); - expect(destinations[2]).withContext('(client 1)(c)').toEqual({ - topic: 'myhome/kitchen/temperature', - params: new Map().set('room', 'kitchen').set('measurement', 'temperature'), - subscription: { - subscriberId: 'client#1;sub#5', - topic: 'myhome/:room/:measurement', - client: client1, - }, - }); - expect(destinations[3]).withContext('(client 1)(d)').toEqual({ - topic: 'myhome/kitchen/temperature', - params: new Map().set('building', 'myhome').set('measurement', 'temperature'), - subscription: { - subscriberId: 'client#1;sub#6', - topic: ':building/kitchen/:measurement', - client: client1, - }, - }); - expect(destinations[4]).withContext('(client 1)(e)').toEqual({ - topic: 'myhome/kitchen/temperature', - params: new Map().set('building', 'myhome').set('room', 'kitchen').set('measurement', 'temperature'), - subscription: { - subscriberId: 'client#1;sub#7', - topic: ':building/:room/:measurement', - client: client1, - }, - }); - expect(destinations[5]).withContext('(client 2)(a)').toEqual({ - topic: 'myhome/kitchen/temperature', - params: new Map().set('measurement', 'temperature'), - subscription: { - subscriberId: 'client#2;sub#3', - topic: 'myhome/kitchen/:measurement', - client: client2, - }, - }); - expect(destinations[6]).withContext('(client 2)(b)').toEqual({ - topic: 'myhome/kitchen/temperature', - params: new Map().set('room', 'kitchen'), - subscription: { - subscriberId: 'client#2;sub#4', - topic: 'myhome/:room/temperature', - client: client2, - }, - }); - expect(destinations[7]).withContext('(client 2)(c)').toEqual({ - topic: 'myhome/kitchen/temperature', - params: new Map().set('room', 'kitchen').set('measurement', 'temperature'), - subscription: { - subscriberId: 'client#2;sub#5', - topic: 'myhome/:room/:measurement', - client: client2, - }, - }); - expect(destinations[8]).withContext('(client 2)(d)').toEqual({ - topic: 'myhome/kitchen/temperature', - params: new Map().set('building', 'myhome').set('measurement', 'temperature'), - subscription: { - subscriberId: 'client#2;sub#6', - topic: ':building/kitchen/:measurement', - client: client2, - }, - }); - expect(destinations[9]).withContext('(client 2)(e)').toEqual({ - topic: 'myhome/kitchen/temperature', - params: new Map().set('building', 'myhome').set('room', 'kitchen').set('measurement', 'temperature'), - subscription: { - subscriberId: 'client#2;sub#7', - topic: ':building/:room/:measurement', - client: client2, - }, - }); + expect(subscribers[0]).withContext('(client 1)(a)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#1;sub#3', + topic: 'myhome/kitchen/:measurement', + client: client1, + })); + expect(subscribers[1]).withContext('(client 1)(b)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#1;sub#4', + topic: 'myhome/:room/temperature', + client: client1, + })); + expect(subscribers[2]).withContext('(client 1)(c)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#1;sub#5', + topic: 'myhome/:room/:measurement', + client: client1, + })); + expect(subscribers[3]).withContext('(client 1)(d)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#1;sub#6', + topic: ':building/kitchen/:measurement', + client: client1, + })); + expect(subscribers[4]).withContext('(client 1)(e)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#1;sub#7', + topic: ':building/:room/:measurement', + client: client1, + })); + expect(subscribers[5]).withContext('(client 2)(a)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#2;sub#3', + topic: 'myhome/kitchen/:measurement', + client: client2, + })); + expect(subscribers[6]).withContext('(client 2)(b)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#2;sub#4', + topic: 'myhome/:room/temperature', + client: client2, + })); + expect(subscribers[7]).withContext('(client 2)(c)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#2;sub#5', + topic: 'myhome/:room/:measurement', + client: client2, + })); + expect(subscribers[8]).withContext('(client 2)(d)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#2;sub#6', + topic: ':building/kitchen/:measurement', + client: client2, + })); + expect(subscribers[9]).withContext('(client 2)(e)').toEqual(jasmine.objectContaining({ + subscriberId: 'client#2;sub#7', + topic: ':building/:room/:measurement', + client: client2, + })); }); function expectSubscriptionCount(topic: string): {toBe: (expected: number) => Promise} { diff --git a/projects/scion/microfrontend-platform/src/lib/host/message-broker/topic-subscription.registry.ts b/projects/scion/microfrontend-platform/src/lib/host/message-broker/topic-subscription.registry.ts index 8de8b871..133f6b40 100644 --- a/projects/scion/microfrontend-platform/src/lib/host/message-broker/topic-subscription.registry.ts +++ b/projects/scion/microfrontend-platform/src/lib/host/message-broker/topic-subscription.registry.ts @@ -8,82 +8,24 @@ * SPDX-License-Identifier: EPL-2.0 */ -import {Defined} from '@scion/toolkit/util'; import {TopicMatcher} from '../../topic-matcher.util'; -import {Observable, Subject} from 'rxjs'; -import {distinctUntilChanged, filter, map, startWith, takeUntil} from 'rxjs/operators'; -import {ClientRegistry} from '../client-registry/client.registry'; -import {Beans, PreDestroy} from '@scion/toolkit/bean-manager'; -import {runSafe} from '../../safe-runner'; import {Client} from '../client-registry/client'; +import {MessageSubscription, MessageSubscriptionRegistry} from './message-subscription.registry'; +import {concatWith, defer, merge, mergeMap, Observable, of} from 'rxjs'; +import {distinctUntilChanged, map} from 'rxjs/operators'; /** * Central point for managing topic subscriptions. * * @ignore */ -export class TopicSubscriptionRegistry implements PreDestroy { - - private readonly _destroy$ = new Subject(); - private readonly _subscriptionRegistry = new Map(); - private readonly _subscriptionChange$ = new Subject(); - - constructor() { - Beans.get(ClientRegistry).unregister$ - .pipe(takeUntil(this._destroy$)) - .subscribe((client: Client) => runSafe(() => { - this.unsubscribeClient(client.id); - })); - } - - /** - * Subscribes the subscriber of given identity to receive messages sent to the given topic. - * - * After calling this method, messages that are published on that topic are transported to the - * subscribing client until {@link unsubscribe} or {@link unsubscribeClient} is called. - * - * @param topic - The topic which to observe; it allows using wildcard segments, e.g., `person/:id`. - * @param client - The client which subscribes to the topic. - * @param subscriberId - Unique identify of the subscriber. - */ - public subscribe(topic: string, client: Client, subscriberId: string): void { - Defined.orElseThrow(subscriberId, () => Error('[TopicSubscribeError] SubscriberId required')); - Defined.orElseThrow(topic, () => Error('[TopicSubscribeError] Topic required')); - Defined.orElseThrow(client, () => Error('[TopicSubscribeError] Client required')); - - this._subscriptionRegistry.set(subscriberId, {subscriberId, topic, client}); - this._subscriptionChange$.next({topic}); - } - - /** - * Unsubscribes a subscriber; has no effect if not registered. - * - * @param subscriberId - Unique identify of the subscriber. - */ - public unsubscribe(subscriberId: string): void { - Defined.orElseThrow(subscriberId, () => Error('[TopicUnsubscribeError] SubscriberId required')); - - const subscription = this._subscriptionRegistry.get(subscriberId); - if (subscription) { - this._subscriptionRegistry.delete(subscriberId); - this._subscriptionChange$.next({topic: subscription.topic}); - } - } +export class TopicSubscriptionRegistry extends MessageSubscriptionRegistry { /** - * Unregisters all subscriptions of a client. - * - * @param clientId - Identifies the client which should be unsubscribed from all its topics. + * @inheritDoc */ - private unsubscribeClient(clientId: string): void { - Defined.orElseThrow(clientId, () => Error('[TopicUnsubscribeError] ClientId required')); - - const subscriptions = this.subscriptions.filter(subscription => subscription.client.id === clientId); - // First, remove all subscriptions, then, notify about topic subscription change. This order is relevant if - // the client has subscribed to a topic multiple times, allowing {@link TopicSubscriptionRegistry#subscriptionCount$} - // to emit only once. - subscriptions.forEach(subscription => this._subscriptionRegistry.delete(subscription.subscriberId)); - subscriptions.forEach(subscription => this._subscriptionChange$.next({topic: subscription.topic})); + public override subscriptions(filter?: {subscriberId?: string; clientId?: string; appSymbolicName?: string; topic?: string}): TopicSubscription[] { + return super.subscriptions(filter).filter(subscription => filter?.topic ? subscription.matches(filter.topic) : true); } /** @@ -98,85 +40,34 @@ export class TopicSubscriptionRegistry implements PreDestroy { throw Error(`[TopicObserveError] Observing the number of subscribers is only allowed on exact topics. Exact topics must not contain wildcard segments. [topic='${topic}']`); } - return this._subscriptionChange$ + const subscriptions$ = defer(() => of(this.subscriptions({topic}))); + const subscriptionsChange$ = merge(this.register$, this.unregister$); + + return subscriptions$ .pipe( - startWith({topic}), - filter(subscriptionChange => new TopicMatcher(subscriptionChange.topic).match(topic).matches), - map(() => this.resolveTopicDestinations(topic).length), + concatWith(subscriptionsChange$.pipe(mergeMap(() => subscriptions$))), + map(subscriptions => subscriptions.length), distinctUntilChanged(), ); } - - /** - * Resolves the destinations to which to transport a message published to the given topic. - * - * A client can have multiple subscriptions that match the topic; in this case, multiple destinations are returned, one - * per subscription. Use the subscription id to map the destination to the subscription. - */ - public resolveTopicDestinations(publishTopic: string): ResolvedTopicDestination[] { - return this.subscriptions.reduce((resolvedTopicDestinations: ResolvedTopicDestination[], subscription: TopicSubscription) => { - const match = new TopicMatcher(subscription.topic).match(publishTopic); - if (match.matches) { - return resolvedTopicDestinations.concat({subscription, topic: publishTopic, params: match.params!}); - } - return resolvedTopicDestinations; - }, [] as ResolvedTopicDestination[]); - } - - private get subscriptions(): TopicSubscription[] { - return Array.from(this._subscriptionRegistry.values()); - } - - public preDestroy(): void { - this._destroy$.next(); - } } /** * Represents a subscription on a topic. The topic may contain wildcard segments. + * * @ignore */ -export interface TopicSubscription { - /** - * Unique identify of the subscriber. - */ - subscriberId: string; - /** - * Topic subscribed by the subscriber; if subscribed to multiple topics (using the colon syntax), - * the resolved segment values are contained in the params map. - */ - topic: string; - /** - * The client which subscribed to the topic. - */ - client: Client; -} +export class TopicSubscription implements MessageSubscription { + + constructor(public readonly topic: string, + public readonly subscriberId: string, + public readonly client: Client) { + } -/** - * Represents the actual destination to which to transport a topic message. - * @ignore - */ -export interface ResolvedTopicDestination { - /** - * Exact topic to which the message was published. - */ - topic: string; - /** - * Contains the resolved values of the wildcard segments as specified in the subscription topic. - * For example: If subscribed to the topic `person/:id` and a message is published to the topic `person/5`, - * the resolved id with the value `5` is contained in the params map. - */ - params: Map; /** - * The actual subscription to which to transport the message. + * Tests whether the given topic matches this subscription. */ - subscription: TopicSubscription; -} - -/** - * Event emitted when some subscriber subscribes or unsubscribes on a topic. - * @ignore - */ -interface SubscriptionChangeEvent { - topic: string; + public matches(topic: string): boolean { + return new TopicMatcher(this.topic).match(topic).matches; + } } diff --git a/projects/scion/microfrontend-platform/src/lib/intent-subscription-legacy-support.ts b/projects/scion/microfrontend-platform/src/lib/intent-subscription-legacy-support.ts new file mode 100644 index 00000000..e92cd361 --- /dev/null +++ b/projects/scion/microfrontend-platform/src/lib/intent-subscription-legacy-support.ts @@ -0,0 +1,41 @@ +import {filter} from 'rxjs/operators'; +import {Client} from './host/client-registry/client'; +import {Beans} from '@scion/toolkit/bean-manager'; +import {VERSION} from './version'; +import {UUID} from '@scion/toolkit/uuid'; +import {ClientRegistry} from './host/client-registry/client.registry'; +import {IntentSubscription, IntentSubscriptionRegistry} from './host/message-broker/intent-subscription.registry'; +import {MessageHeaders} from './messaging.model'; +import {Logger, LoggingContext} from './logger'; +import {semver} from './host/semver'; + +/** + * Provides legacy intent subscription support for clients older than version 1.0.0-rc.8. + * + * @deprecated since version 1.0.0-rc.8; Legacy support will be removed in version 1.0.0. + * @internal + */ +export namespace IntentSubscriptionLegacySupport { + + const VERSION_NEW_API = '1.0.0-rc.8'; + + export function subscriberIdMessageHeaderName(clientVersion: string): string { + if (semver.lt(clientVersion, VERSION_NEW_API)) { + return 'ɵTOPIC_SUBSCRIBER_ID'; + } + return MessageHeaders.ɵSubscriberId; + } + + /** + * Provides legacy intent subscription support for clients older than version 1.0.0-rc.8. + */ + export function installLegacyClientIntentSubscription(): void { + Beans.get(ClientRegistry).register$ + .pipe(filter(client => semver.lt(client.version, VERSION_NEW_API))) + .subscribe((legacyClient: Client) => { + const legacyClientSubscription = new IntentSubscription({}, UUID.randomUUID(), legacyClient); + Beans.get(IntentSubscriptionRegistry).register(legacyClientSubscription); + Beans.get(Logger).warn(`[DEPRECATION][FE93C94] Application "${legacyClient.application.symbolicName}" is using a legacy protocol for subscribing to intents. Please update @scion/microfrontend-platform to version '${Beans.get(VERSION)}'.`, new LoggingContext(legacyClient.application.symbolicName, legacyClient.version)); + }); + } +} diff --git a/projects/scion/microfrontend-platform/src/lib/messaging.model.ts b/projects/scion/microfrontend-platform/src/lib/messaging.model.ts index 1c8b4a78..9f7a7137 100644 --- a/projects/scion/microfrontend-platform/src/lib/messaging.model.ts +++ b/projects/scion/microfrontend-platform/src/lib/messaging.model.ts @@ -26,9 +26,9 @@ export interface Message { } /** - * Represents an intent issued by an application. + * Represents an intent sent by an application. * - * The intent is transported to all clients that provide a satisfying capability visible to the issuing application. + * The intent is transported to applications that provide a fulfilling capability visible to the sending application. * * @category Messaging */ @@ -178,7 +178,7 @@ export enum MessageHeaders { * * @internal */ - ɵTopicSubscriberId = 'ɵTOPIC_SUBSCRIBER_ID', + ɵSubscriberId = 'ɵSUBSCRIBER_ID', } /** diff --git a/projects/scion/microfrontend-platform/src/lib/microfrontend-platform.ts b/projects/scion/microfrontend-platform/src/lib/microfrontend-platform.ts index a01f6b55..02eff58a 100644 --- a/projects/scion/microfrontend-platform/src/lib/microfrontend-platform.ts +++ b/projects/scion/microfrontend-platform/src/lib/microfrontend-platform.ts @@ -57,11 +57,12 @@ import {ɵClientRegistry} from './host/client-registry/ɵclient.registry'; import {IntentInterceptor} from './host/message-broker/message-interception'; import {MicrofrontendIntentNavigator} from './host/router/microfrontend-intent-navigator.interceptor'; import {IntentParamValidator} from './host/message-broker/intent-param-validator.interceptor'; +import {IntentSubscriptionRegistry} from './host/message-broker/intent-subscription.registry'; /** * Current version of the SCION Microfrontend Platform. */ -const version = '1.0.0-rc.7'; +const version = '1.0.0-rc.8'; /** * **SCION Microfrontend Platform is a TypeScript-based open-source library that helps to implement a microfrontend architecture.** @@ -172,6 +173,7 @@ export class MicrofrontendPlatform { Beans.register(MouseUpEventDispatcher, {eager: true}); Beans.register(MessageBroker, {destroyOrder: BeanDestroyOrders.BROKER}); Beans.register(TopicSubscriptionRegistry, {destroyOrder: BeanDestroyOrders.BROKER}); + Beans.register(IntentSubscriptionRegistry, {destroyOrder: BeanDestroyOrders.BROKER}); Beans.registerIfAbsent(OutletRouter); Beans.registerIfAbsent(RelativePathResolver); Beans.registerIfAbsent(RouterOutletUrlAssigner); diff --git a/projects/scion/microfrontend-platform/src/lib/operators.ts b/projects/scion/microfrontend-platform/src/lib/operators.ts index 55b1c523..9ec36bf7 100644 --- a/projects/scion/microfrontend-platform/src/lib/operators.ts +++ b/projects/scion/microfrontend-platform/src/lib/operators.ts @@ -49,10 +49,17 @@ export function filterByOrigin(origin: string): MonoTypeOperatorFunction(header: {key: string; value: any}): MonoTypeOperatorFunction>> { +export function filterByWindow(window: Window): MonoTypeOperatorFunction { + return filter((event: MessageEvent): boolean => { + return event.source === window; + }); +} + +/** @ignore */ +export function filterByMessageHeader(header: {name: string; value: any}): MonoTypeOperatorFunction>> { return filter((event: MessageEvent>): boolean => { const messageHeaders = event.data.message.headers; - return messageHeaders.has(header.key) && messageHeaders.get(header.key) === header.value; + return messageHeaders.has(header.name) && messageHeaders.get(header.name) === header.value; }); } diff --git a/projects/scion/microfrontend-platform/src/lib/version.spec.ts b/projects/scion/microfrontend-platform/src/lib/version.spec.ts index f94672e5..01dcbc72 100644 --- a/projects/scion/microfrontend-platform/src/lib/version.spec.ts +++ b/projects/scion/microfrontend-platform/src/lib/version.spec.ts @@ -71,7 +71,7 @@ describe('MicrofrontendPlatform', () => { }); it('should warn if client does not support heartbeat introduced in version "1.0.0-rc.1"', async () => { - setHostAppMicrofrontendPlatformVersion('1.0.0-rc.3'); + setHostAppMicrofrontendPlatformVersion('1.0.0'); await MicrofrontendPlatform.startHost({ applications: [ @@ -87,7 +87,7 @@ describe('MicrofrontendPlatform', () => { // Assert version mismatch warning expect(readConsoleLog('warn', {filter: /\[VersionMismatch]/})).toEqual(jasmine.arrayContaining([ - `[VersionMismatch] Since '@scion/microfrontend-platform@1.0.0-rc.1', connected clients must send a heartbeat to indicate liveness. Please upgrade @scion/microfrontend-platform of application 'client' from version '1.0.0-beta.20' to version '1.0.0-rc.3'.`, + `[VersionMismatch] Since '@scion/microfrontend-platform@1.0.0-rc.1', connected clients must send a heartbeat to indicate liveness. Please upgrade @scion/microfrontend-platform of application 'client' from version '1.0.0-beta.20' to version '1.0.0'.`, ])); }); diff --git "a/projects/scion/microfrontend-platform/src/lib/\311\265messaging.model.ts" "b/projects/scion/microfrontend-platform/src/lib/\311\265messaging.model.ts" index 945c65d7..bf691a95 100644 --- "a/projects/scion/microfrontend-platform/src/lib/\311\265messaging.model.ts" +++ "b/projects/scion/microfrontend-platform/src/lib/\311\265messaging.model.ts" @@ -8,6 +8,7 @@ * SPDX-License-Identifier: EPL-2.0 */ import {Message} from './messaging.model'; +import {IntentSelector} from './client/messaging/intent-client'; /** * Declares the message transports. @@ -40,11 +41,19 @@ export enum MessagingChannel { */ TopicUnsubscribe = 'topic-unsubscribe', /** - * Channel to publish and dispatch topic-related messages. + * Channel for clients to subscribe to intents. + */ + IntentSubscribe = 'intent-subscribe', + /** + * Channel for clients to unsubscribe from intents. + */ + IntentUnsubscribe = 'intent-unsubscribe', + /** + * Channel for the host to transport topic message to subscribed clients. */ Topic = 'topic', /** - * Channel to publish and dispatch intents. + * Channel for the host to transport intent messages to subscribed clients. */ Intent = 'intent', /** @@ -90,6 +99,7 @@ export namespace PlatformTopics { * Allows reading the registered applications from this retained topic. */ export const Applications = 'ɵAPPLICATIONS'; + /** * Computes the topic where a client can publish its heartbeat. */ @@ -114,18 +124,28 @@ export interface ConnackMessage { heartbeatInterval?: number; } -export interface TopicSubscribeCommand extends Message { +export interface SubscribeCommand extends Message { /** * Unique identify of the subscriber. */ subscriberId: string; +} + +export interface TopicSubscribeCommand extends SubscribeCommand { /** - * Topic to which to subscribe. + * Topic to subscribe. */ topic: string; } -export interface TopicUnsubscribeCommand extends Message { +export interface IntentSubscribeCommand extends SubscribeCommand { + /** + * Selects intents that match the specified selector and for which the application provides a fulfilling capability. + */ + selector?: IntentSelector; +} + +export interface UnsubscribeCommand extends Message { /** * Unique identify of the subscriber. */