From d0eeaf5aef0bc4cfc39f3de2e8443f5a1988e1ea Mon Sep 17 00:00:00 2001 From: danielwiehl Date: Wed, 3 Feb 2021 01:15:03 +0100 Subject: [PATCH] feat(platform): add convenience API to reduce code required to respond to requests This commits adds the methods `onMessage` to the 'MessageClient` and `onIntent` to the `IntentClient`. Unlike `observe$`, messages are passed to a callback function rather than emitted from an Observable. Response(s) can be returned directly from the callback. It supports error propagation and request termination. Using this method over `observe$` significantly reduces the code required to respond to requests. closes #43 BREAKING CHANGE: Adding the convenience API for responding to requests introduced the following breaking change for Angular projects. > Note: The messaging protocol between the host and client HAS NOT CHANGED. Thus, you can upgrade the host and clients to the new version independently. To migrate: - If an Angular project, add the method `onMessage` to your NgZone message client decorator, as following: ```typescript public onMessage(topic: string, onMessage: (message: TopicMessage) => Observable | Promise | OUT | void): Subscription { return messageClient.onMessage(topic, onMessage); } ``` See https://scion-microfrontend-platform-developer-guide.now.sh/#chapter:angular-integration-guide:preparing-messaging-for-use-with-angular for more information. - If an Angular project, add the method `onIntent` to your NgZone intent client decorator, as following: ```typescript public onIntent(selector: IntentSelector, onIntent: (intentMessage: IntentMessage) => Observable | Promise | OUT | void): Subscription { return intentClient.onIntent(selector, onIntent); } ``` See https://scion-microfrontend-platform-developer-guide.now.sh/#chapter:angular-integration-guide:preparing-messaging-for-use-with-angular for more information. --- .../src/app/ng-zone-decorators.ts | 12 +- .../src/app/ng-zone-decorators.ts | 12 +- .../intent-based-communication.snippets.ts | 27 +- .../intent-based-messaging.adoc | 13 + .../topic-based-communication.snippets.ts | 20 +- .../topic-based-messaging.adoc | 14 + ...-zone-message-client-decorator.snippets.ts | 12 +- .../src/lib/client/messaging/intent-client.ts | 90 +- .../lib/client/messaging/message-client.ts | 89 +- .../client/messaging/message-handler.spec.ts | 1062 +++++++++++++++++ .../lib/client/messaging/message-handler.ts | 101 ++ .../messaging/\311\265intent-client.ts" | 79 ++ .../messaging/\311\265message-client.ts" | 76 ++ .../src/lib/error.util.ts | 19 + .../src/lib/host/application-registry.spec.ts | 2 +- .../\311\265manifest-registry.ts" | 19 +- .../src/lib/microfrontend-platform.ts | 6 +- .../src/lib/safe-runner.ts | 13 +- 18 files changed, 1512 insertions(+), 154 deletions(-) create mode 100644 projects/scion/microfrontend-platform/src/lib/client/messaging/message-handler.spec.ts create mode 100644 projects/scion/microfrontend-platform/src/lib/client/messaging/message-handler.ts create mode 100644 "projects/scion/microfrontend-platform/src/lib/client/messaging/\311\265intent-client.ts" create mode 100644 "projects/scion/microfrontend-platform/src/lib/client/messaging/\311\265message-client.ts" create mode 100644 projects/scion/microfrontend-platform/src/lib/error.util.ts diff --git a/apps/microfrontend-platform-devtools/src/app/ng-zone-decorators.ts b/apps/microfrontend-platform-devtools/src/app/ng-zone-decorators.ts index 3de35dbf..4b1e1a6a 100644 --- a/apps/microfrontend-platform-devtools/src/app/ng-zone-decorators.ts +++ b/apps/microfrontend-platform-devtools/src/app/ng-zone-decorators.ts @@ -9,9 +9,9 @@ */ import { BeanDecorator } from '@scion/toolkit/bean-manager'; -import { Intent, IntentClient, IntentMessage, IntentOptions, MessageClient, PublishOptions, RequestOptions, TopicMessage } from '@scion/microfrontend-platform'; +import { Intent, IntentClient, IntentMessage, IntentOptions, IntentSelector, MessageClient, PublishOptions, RequestOptions, TopicMessage } from '@scion/microfrontend-platform'; import { Injectable, NgZone } from '@angular/core'; -import { MonoTypeOperatorFunction, Observable, pipe } from 'rxjs'; +import { MonoTypeOperatorFunction, Observable, pipe, Subscription } from 'rxjs'; import { observeInside, subscribeInside } from '@scion/toolkit/operators'; /** @@ -39,6 +39,10 @@ export class NgZoneMessageClientDecorator implements BeanDecorator(topic).pipe(synchronizeWithAngular(zone)); } + public onMessage(topic: string, callback: (message: TopicMessage) => Observable | Promise | OUT | void): Subscription { + return messageClient.onMessage(topic, callback); + } + public subscriberCount$(topic: string): Observable { return messageClient.subscriberCount$(topic).pipe(synchronizeWithAngular(zone)); } @@ -70,6 +74,10 @@ export class NgZoneIntentClientDecorator implements BeanDecorator public observe$(selector?: Intent): Observable> { return intentClient.observe$(selector).pipe(synchronizeWithAngular(zone)); } + + public onIntent(selector: IntentSelector, callback: (intentMessage: IntentMessage) => Observable | Promise | OUT | void): Subscription { + return intentClient.onIntent(selector, callback); + } }; } } diff --git a/apps/microfrontend-platform-testing-app/src/app/ng-zone-decorators.ts b/apps/microfrontend-platform-testing-app/src/app/ng-zone-decorators.ts index 3de35dbf..4b1e1a6a 100644 --- a/apps/microfrontend-platform-testing-app/src/app/ng-zone-decorators.ts +++ b/apps/microfrontend-platform-testing-app/src/app/ng-zone-decorators.ts @@ -9,9 +9,9 @@ */ import { BeanDecorator } from '@scion/toolkit/bean-manager'; -import { Intent, IntentClient, IntentMessage, IntentOptions, MessageClient, PublishOptions, RequestOptions, TopicMessage } from '@scion/microfrontend-platform'; +import { Intent, IntentClient, IntentMessage, IntentOptions, IntentSelector, MessageClient, PublishOptions, RequestOptions, TopicMessage } from '@scion/microfrontend-platform'; import { Injectable, NgZone } from '@angular/core'; -import { MonoTypeOperatorFunction, Observable, pipe } from 'rxjs'; +import { MonoTypeOperatorFunction, Observable, pipe, Subscription } from 'rxjs'; import { observeInside, subscribeInside } from '@scion/toolkit/operators'; /** @@ -39,6 +39,10 @@ export class NgZoneMessageClientDecorator implements BeanDecorator(topic).pipe(synchronizeWithAngular(zone)); } + public onMessage(topic: string, callback: (message: TopicMessage) => Observable | Promise | OUT | void): Subscription { + return messageClient.onMessage(topic, callback); + } + public subscriberCount$(topic: string): Observable { return messageClient.subscriberCount$(topic).pipe(synchronizeWithAngular(zone)); } @@ -70,6 +74,10 @@ export class NgZoneIntentClientDecorator implements BeanDecorator public observe$(selector?: Intent): Observable> { return intentClient.observe$(selector).pipe(synchronizeWithAngular(zone)); } + + public onIntent(selector: IntentSelector, callback: (intentMessage: IntentMessage) => Observable | Promise | OUT | void): Subscription { + return intentClient.onIntent(selector, callback); + } }; } } diff --git a/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/intent-based-communication.snippets.ts b/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/intent-based-communication.snippets.ts index 6dee9deb..abba0e35 100644 --- a/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/intent-based-communication.snippets.ts +++ b/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/intent-based-communication.snippets.ts @@ -1,7 +1,7 @@ import { Intent, IntentClient, IntentMessage, IntentSelector, MessageClient, MessageHeaders, OutletRouter, PRIMARY_OUTLET, ResponseStatusCodes, takeUntilUnsubscribe, TopicMessage } from '@scion/microfrontend-platform'; import { Subject } from 'rxjs'; import { Beans } from '@scion/toolkit/bean-manager'; -import { take } from 'rxjs/operators'; +import { map, take } from 'rxjs/operators'; ` // tag::intention-declaration[] @@ -121,7 +121,7 @@ import { take } from 'rxjs/operators'; { const authService = new class { - userAccessToken$ = new Subject(); + public userAccessToken$ = new Subject(); }; // tag::reply[] @@ -155,3 +155,26 @@ import { take } from 'rxjs/operators'; }); // end::reply[] } + +{ + const authService = new class { + public userAccessToken$ = new Subject(); + }; + + // tag::onIntent[] + const selector: IntentSelector = { + type: 'auth', + qualifier: {entity: 'user-access-token'}, + }; + + // Stream data as long as the requestor is subscribed to receive replies. + Beans.get(IntentClient).onIntent(selector, intentMessage => { + return authService.userAccessToken$; + }); + + // Alternatively, you can complete the requestor's Observable with the first reply. + Beans.get(IntentClient).onIntent(selector, intentMessage => { + return authService.userAccessToken$.pipe(take(1)); + }); + // end::onIntent[] +} diff --git a/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/intent-based-messaging.adoc b/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/intent-based-messaging.adoc index 0ec13e30..2dc69e1d 100644 --- a/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/intent-based-messaging.adoc +++ b/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/intent-based-messaging.adoc @@ -16,6 +16,7 @@ In this Chapter - <> - <> - <> +- <> **** ''' @@ -202,3 +203,15 @@ include::intent-based-communication.snippets.ts[tags=reply] TIP: If streaming data like in the example above, the replier can use the RxJS `takeUntilUnsubscribe` operator of the platform to stop replying when the requestor unsubscribes. +[[chapter:intent-based-messaging:convenience-api-for-handling-intents]] +[discrete] +=== Convenience API for handling messages +The intent client provides the `onIntent` method as a convenience to the `observe$` method. Unlike `observe$`, intents are passed to a callback function rather than emitted from an Observable. Response(s) can be returned directly from the callback. It supports error propagation and request termination. Using this method over `observe$` significantly reduces the code required to respond to requests. + +[source,typescript] +---- +include::intent-based-communication.snippets.ts[tags=onIntent] +---- + +For each intent received, the specified callback function is called. When used in request-response communication, the callback function can return the response either directly or in the form of a Promise or Observable. Returning a Promise allows the response to be computed asynchronously, and an Observable allows to return one or more responses, e.g., for streaming data. In either case, when the final response is produced, the handler terminates the communication, completing the requestor's Observable. If the callback throws an error, or the returned Promise or Observable errors, the error is transported to the requestor, erroring the requestor's Observable. + diff --git a/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/topic-based-communication.snippets.ts b/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/topic-based-communication.snippets.ts index 61b04ec7..6ef80bef 100644 --- a/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/topic-based-communication.snippets.ts +++ b/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/topic-based-communication.snippets.ts @@ -1,7 +1,7 @@ import { MessageClient, MessageHeaders, ResponseStatusCodes, takeUntilUnsubscribe, TopicMessage } from '@scion/microfrontend-platform'; import { Subject } from 'rxjs'; import { Beans } from '@scion/toolkit/bean-manager'; -import { take } from 'rxjs/operators'; +import { map, take } from 'rxjs/operators'; { // tag::publish[] @@ -102,3 +102,21 @@ import { take } from 'rxjs/operators'; }); // end::reply[] } + +{ + const sensor$ = new Subject(); + + // tag::onMessage[] + const topic: string = 'myhome/livingroom/temperature'; + + // Stream data as long as the requestor is subscribed to receive replies. + Beans.get(MessageClient).onMessage(topic, message => { + return sensor$.pipe(map(temperature => `${temperature} °C`)); + }); + + // Alternatively, you can complete the requestor's Observable with the first reply. + Beans.get(MessageClient).onMessage(topic, message => { + return sensor$.pipe(map(temperature => `${temperature} °C`), take(1)); + }); + // end::onMessage[] +} diff --git a/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/topic-based-messaging.adoc b/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/topic-based-messaging.adoc index aff216f4..67fedcef 100644 --- a/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/topic-based-messaging.adoc +++ b/docs/adoc/microfrontend-platform-developer-guide/chapters/core-concepts/cross-application-communication/topic-based-messaging.adoc @@ -17,6 +17,7 @@ In this Chapter - <> - <> - <> +- <> **** @@ -159,3 +160,16 @@ include::topic-based-communication.snippets.ts[tags=reply] TIP: If streaming data like in the example above, the replier can use the RxJS `takeUntilUnsubscribe` operator of the platform to stop replying when the requestor unsubscribes. + +[[chapter:topic-based-messaging:convenience-api-for-handling-messages]] +[discrete] +=== Convenience API for handling messages +The message client provides the `onMessage` method as a convenience to the `observe$` method. Unlike `observe$`, messages are passed to a callback function rather than emitted from an Observable. Response(s) can be returned directly from the callback. It supports error propagation and request termination. Using this method over `observe$` significantly reduces the code required to respond to requests. + +[source,typescript] +---- +include::topic-based-communication.snippets.ts[tags=onMessage] +---- + +For each message received, the specified callback function is called. When used in request-response communication, the callback function can return the response either directly or in the form of a Promise or Observable. Returning a Promise allows the response to be computed asynchronously, and an Observable allows to return one or more responses, e.g., for streaming data. In either case, when the final response is produced, the handler terminates the communication, completing the requestor's Observable. If the callback throws an error, or the returned Promise or Observable errors, the error is transported to the requestor, erroring the requestor's Observable. + diff --git a/docs/adoc/microfrontend-platform-developer-guide/chapters/miscellaneous/angular-integration-guide/angular-zone-message-client-decorator.snippets.ts b/docs/adoc/microfrontend-platform-developer-guide/chapters/miscellaneous/angular-integration-guide/angular-zone-message-client-decorator.snippets.ts index 76ecee50..6272eb60 100644 --- a/docs/adoc/microfrontend-platform-developer-guide/chapters/miscellaneous/angular-integration-guide/angular-zone-message-client-decorator.snippets.ts +++ b/docs/adoc/microfrontend-platform-developer-guide/chapters/miscellaneous/angular-integration-guide/angular-zone-message-client-decorator.snippets.ts @@ -1,6 +1,6 @@ -import { Intent, IntentClient, IntentMessage, IntentOptions, MessageClient, MicrofrontendPlatform, PlatformState, PublishOptions, RequestOptions, TopicMessage } from '@scion/microfrontend-platform'; +import { Intent, IntentClient, IntentMessage, IntentOptions, IntentSelector, MessageClient, MicrofrontendPlatform, PlatformState, PublishOptions, RequestOptions, TopicMessage } from '@scion/microfrontend-platform'; import { Injectable, NgZone } from '@angular/core'; -import { MonoTypeOperatorFunction, Observable, pipe } from 'rxjs'; +import { MonoTypeOperatorFunction, Observable, pipe, Subscription } from 'rxjs'; import { HttpPlatformConfigLoader } from './start-platform-via-initializer.snippets'; import { BeanDecorator, Beans } from '@scion/toolkit/bean-manager'; import { observeInside, subscribeInside } from '@scion/toolkit/operators'; @@ -33,6 +33,10 @@ export class NgZoneMessageClientDecorator implements BeanDecorator(topic).pipe(synchronizeWithAngular(zone)); // <3> } + public onMessage(topic: string, callback: (message: TopicMessage) => Observable | Promise | OUT | void): Subscription { + return messageClient.onMessage(topic, callback); + } + public subscriberCount$(topic: string): Observable { return messageClient.subscriberCount$(topic).pipe(synchronizeWithAngular(zone)); // <3> } @@ -67,6 +71,10 @@ export class NgZoneIntentClientDecorator implements BeanDecorator public observe$(selector?: Intent): Observable> { return intentClient.observe$(selector).pipe(synchronizeWithAngular(zone)); // <3> } + + public onIntent(selector: IntentSelector, callback: (intentMessage: IntentMessage) => Observable | Promise | OUT | void): Subscription { + return intentClient.onIntent(selector, callback); + } }; } } diff --git a/projects/scion/microfrontend-platform/src/lib/client/messaging/intent-client.ts b/projects/scion/microfrontend-platform/src/lib/client/messaging/intent-client.ts index 3d67e365..16a572a0 100644 --- a/projects/scion/microfrontend-platform/src/lib/client/messaging/intent-client.ts +++ b/projects/scion/microfrontend-platform/src/lib/client/messaging/intent-client.ts @@ -8,19 +8,14 @@ * SPDX-License-Identifier: EPL-2.0 */ -import { defer, Observable } from 'rxjs'; -import { Intent, IntentMessage, throwOnErrorStatus, TopicMessage } from '../../messaging.model'; +import { Observable, Subscription } from 'rxjs'; +import { Intent, IntentMessage, TopicMessage } from '../../messaging.model'; import { Qualifier } from '../../platform.model'; -import { BrokerGateway } from './broker-gateway'; -import { MessagingChannel } from '../../ɵmessaging.model'; -import { filterByChannel, pluckMessage } from '../../operators'; -import { filter } from 'rxjs/operators'; -import { matchesIntentQualifier } from '../../qualifier-tester'; /** * Intent client for sending and receiving intents between microfrontends across origins. * - * Intent-based communication enables controlled collaboration between micro applications. It is inspired by the Android platform, + * Intent-based messaging enables controlled collaboration between micro applications, a mechanism known from Android development * where an application can start an activity via an intent (such as sending an email). * * Like topic-based communication, intent-based communication implements the pub/sub (publish/subscribe) messaging pattern, but is, @@ -147,47 +142,30 @@ export abstract class IntentClient { * @return An Observable that emits intents for which this application provides a satisfying capability. It never completes. */ public abstract observe$(selector?: IntentSelector): Observable>; -} - -/** - * @ignore - */ -export class ɵIntentClient implements IntentClient { // tslint:disable-line:class-name - constructor(private readonly _brokerGateway: BrokerGateway) { - } - - public publish(intent: Intent, body?: T, options?: IntentOptions): Promise { - assertIntentQualifier(intent.qualifier, {allowWildcards: false}); - const headers = new Map(options && options.headers); - const intentMessage: IntentMessage = {intent, headers: new Map(headers)}; - setBodyIfDefined(intentMessage, body); - return this._brokerGateway.postMessage(MessagingChannel.Intent, intentMessage); - } - - public request$(intent: Intent, body?: any, options?: IntentOptions): Observable> { - assertIntentQualifier(intent.qualifier, {allowWildcards: false}); - // IMPORTANT: - // When sending a request, the platform adds various headers to the message. Therefore, to support multiple subscriptions - // to the returned Observable, each subscription must have its individual message instance and headers map. - // In addition, the headers are copied to prevent modifications before the effective subscription. - const headers = new Map(options && options.headers); - return defer(() => { - const intentMessage: IntentMessage = {intent, headers: new Map(headers)}; - setBodyIfDefined(intentMessage, body); - return this._brokerGateway.requestReply$(MessagingChannel.Intent, intentMessage).pipe(throwOnErrorStatus()); - }); - } - - public observe$(selector?: IntentSelector): Observable> { - return this._brokerGateway.message$ - .pipe( - filterByChannel>(MessagingChannel.Intent), - pluckMessage(), - filter(message => !selector || !selector.type || selector.type === message.intent.type), - filter(message => !selector || !selector.qualifier || matchesIntentQualifier(selector.qualifier, message.intent.qualifier)), - ); - } + /** + * Convenience API for handling intents. + * + * Unlike `observe$`, intents are passed to a callback function rather than emitted from an Observable. Response(s) can be returned directly + * from the callback. It supports error propagation and request termination. Using this method over `observe$` significantly reduces the code + * required to respond to requests. + * + * For each intent received, the specified callback function is called. When used in request-response communication, + * the callback function can return the response either directly or in the form of a Promise or Observable. Returning a Promise + * allows the response to be computed asynchronously, and an Observable allows to return one or more responses, e.g., for + * streaming data. In either case, when the final response is produced, the handler terminates the communication, completing + * the requestor's Observable. If the callback throws an error, or the returned Promise or Observable errors, the error is + * transported to the requestor, erroring the requestor's Observable. + * + * @param selector - Allows filtering intents. + * For more information, see the API description of {@link observe$}. + * @param callback - Specifies the callback to be called for each intent. When used in request-response communication, + * the callback function can return the response either directly or in the form of a Promise or Observable. If returning + * a response in fire-and-forget communication, it is ignored. Throwing an error in the callback does not unregister the callback. + * @return Subscription to unregister the callback. Calling {@link Subscription.unsubscribe} will complete the Observable of all + * requestors, if any. + */ + public abstract onIntent(selector: IntentSelector, callback: (intentMessage: IntentMessage) => Observable | Promise | OUT | void): Subscription; } /** @@ -218,19 +196,3 @@ export interface IntentSelector { */ qualifier?: Qualifier; } - -function assertIntentQualifier(qualifier: Qualifier, options: { allowWildcards: boolean }): void { - if (!qualifier || Object.keys(qualifier).length === 0) { - return; - } - - if (!options.allowWildcards && Object.entries(qualifier).some(([key, value]) => key === '*' || value === '*' || value === '?')) { - throw Error(`[IllegalQualifierError] Qualifier must not contain wildcards. [qualifier='${JSON.stringify(qualifier)}']`); - } -} - -function setBodyIfDefined(message: TopicMessage | IntentMessage, body?: T): void { - if (body !== undefined) { - message.body = body; - } -} diff --git a/projects/scion/microfrontend-platform/src/lib/client/messaging/message-client.ts b/projects/scion/microfrontend-platform/src/lib/client/messaging/message-client.ts index 01b1fda4..e3d20abd 100644 --- a/projects/scion/microfrontend-platform/src/lib/client/messaging/message-client.ts +++ b/projects/scion/microfrontend-platform/src/lib/client/messaging/message-client.ts @@ -7,13 +7,9 @@ * * SPDX-License-Identifier: EPL-2.0 */ -import { defer, MonoTypeOperatorFunction, Observable } from 'rxjs'; -import { IntentMessage, mapToBody, throwOnErrorStatus, TopicMessage } from '../../messaging.model'; +import { MonoTypeOperatorFunction, Observable, Subscription } from 'rxjs'; +import { TopicMessage } from '../../messaging.model'; import { first, takeUntil } from 'rxjs/operators'; -import { BrokerGateway } from './broker-gateway'; -import { Defined } from '@scion/toolkit/util'; -import { MessagingChannel, PlatformTopics } from '../../ɵmessaging.model'; -import { TopicMatcher } from '../../topic-matcher.util'; import { AbstractType, Beans, Type } from '@scion/toolkit/bean-manager'; /** @@ -122,6 +118,30 @@ export abstract class MessageClient { */ public abstract observe$(topic: string): Observable>; + /** + * Convenience API for handling messages. + * + * Unlike `observe$`, messages are passed to a callback function rather than emitted from an Observable. Response(s) can be returned directly + * from the callback. It supports error propagation and request termination. Using this method over `observe$` significantly reduces the code + * required to respond to requests. + * + * For each message received, the specified callback function is called. When used in request-response communication, + * the callback function can return the response either directly or in the form of a Promise or Observable. Returning a Promise + * allows the response to be computed asynchronously, and an Observable allows to return one or more responses, e.g., for + * streaming data. In either case, when the final response is produced, the handler terminates the communication, completing + * the requestor's Observable. If the callback throws an error, or the returned Promise or Observable errors, the error is + * transported to the requestor, erroring the requestor's Observable. + * + * @param topic - Specifies the topic which to observe. + * For more information, see the API description of {@link observe$}. + * @param callback - Specifies the callback to be called for each message. When used in request-response communication, + * the callback function can return the response either directly or in the form of a Promise or Observable. If returning + * a response in fire-and-forget communication, it is ignored. Throwing an error in the callback does not unregister the callback. + * @return Subscription to unregister the callback. Calling {@link Subscription.unsubscribe} will complete the Observable of all + * requestors, if any. + */ + public abstract onMessage(topic: string, callback: (message: TopicMessage) => Observable | Promise | OUT | void): Subscription; + /** * Allows observing the number of subscriptions on a topic. * @@ -174,60 +194,3 @@ export interface RequestOptions { */ headers?: Map; } - -/** - * @ignore - */ -export class ɵMessageClient implements MessageClient { // tslint:disable-line:class-name - - constructor(private readonly _brokerGateway: BrokerGateway) { - } - - public publish(topic: string, message?: T, options?: PublishOptions): Promise { - assertTopic(topic, {allowWildcardSegments: false}); - const headers = new Map(options && options.headers); - const topicMessage: TopicMessage = {topic, retain: Defined.orElse(options && options.retain, false), headers: new Map(headers)}; - setBodyIfDefined(topicMessage, message); - return this._brokerGateway.postMessage(MessagingChannel.Topic, topicMessage); - } - - public request$(topic: string, request?: any, options?: RequestOptions): Observable> { - assertTopic(topic, {allowWildcardSegments: false}); - // IMPORTANT: - // When sending a request, the platform adds various headers to the message. Therefore, to support multiple subscriptions - // to the returned Observable, each subscription must have its individual message instance and headers map. - // In addition, the headers are copied to prevent modifications before the effective subscription. - const headers = new Map(options && options.headers); - return defer(() => { - const topicMessage: TopicMessage = {topic, retain: false, headers: new Map(headers)}; - setBodyIfDefined(topicMessage, request); - return this._brokerGateway.requestReply$(MessagingChannel.Topic, topicMessage).pipe(throwOnErrorStatus()); - }); - } - - public observe$(topic: string): Observable> { - assertTopic(topic, {allowWildcardSegments: true}); - return this._brokerGateway.subscribeToTopic(topic); - } - - public subscriberCount$(topic: string): Observable { - assertTopic(topic, {allowWildcardSegments: false}); - return this.request$(PlatformTopics.RequestSubscriberCount, topic).pipe(mapToBody()); - } -} - -function assertTopic(topic: string, options: { allowWildcardSegments: boolean }): void { - if (topic === undefined || topic === null || topic.length === 0) { - throw Error('[IllegalTopicError] Topic must not be `null`, `undefined` or empty'); - } - - if (!options.allowWildcardSegments && TopicMatcher.containsWildcardSegments(topic)) { - throw Error(`[IllegalTopicError] Topic not allowed to contain wildcard segments. [topic='${topic}']`); - } -} - -function setBodyIfDefined(message: TopicMessage | IntentMessage, body?: T): void { - if (body !== undefined) { - message.body = body; - } -} diff --git a/projects/scion/microfrontend-platform/src/lib/client/messaging/message-handler.spec.ts b/projects/scion/microfrontend-platform/src/lib/client/messaging/message-handler.spec.ts new file mode 100644 index 00000000..8748764f --- /dev/null +++ b/projects/scion/microfrontend-platform/src/lib/client/messaging/message-handler.spec.ts @@ -0,0 +1,1062 @@ +/* + * Copyright (c) 2018-2020 Swiss Federal Railways + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + */ +import { MessageClient } from '../../client/messaging/message-client'; +import { expectPromise, ObserveCaptor, serveManifest, waitFor, waitForCondition } from '../../spec.util.spec'; +import { MicrofrontendPlatform } from '../../microfrontend-platform'; +import { Beans } from '@scion/toolkit/bean-manager'; +import { ApplicationConfig } from '../../host/platform-config'; +import { IntentMessage, TopicMessage } from '../../messaging.model'; +import { AsyncSubject, concat, Observable, of, ReplaySubject, Subject, throwError } from 'rxjs'; +import { finalize } from 'rxjs/operators'; +import { IntentClient } from './intent-client'; + +const bodyExtractFn = (msg: TopicMessage | IntentMessage): T => msg.body; + +describe('Message Handler', () => { + + beforeEach(async () => await MicrofrontendPlatform.destroy()); + afterEach(async () => await MicrofrontendPlatform.destroy()); + + describe('pub/sub', () => { + + it('should receive messages published to a topic', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const collector = new Array(); + Beans.get(MessageClient).onMessage('topic', message => { + collector.push(message.body); + }); + + await Beans.get(MessageClient).publish('topic', 'A'); + await Beans.get(MessageClient).publish('topic', 'B'); + await Beans.get(MessageClient).publish('topic', 'C'); + + await waitForCondition(() => collector.length === 3); + await expect(collector).toEqual(['A', 'B', 'C']); + }); + + it('should not unregister the callback on error', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const collector = new Array(); + Beans.get(MessageClient).onMessage('topic', message => { + collector.push(message.body); + throw Error('some error'); + }); + + await Beans.get(MessageClient).publish('topic', 'A'); + await Beans.get(MessageClient).publish('topic', 'B'); + await Beans.get(MessageClient).publish('topic', 'C'); + + await waitForCondition(() => collector.length === 3); + await expect(collector).toEqual(['A', 'B', 'C']); + }); + + it('should not unregister the callback on async error', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const collector = new Array(); + Beans.get(MessageClient).onMessage('topic', async message => { + collector.push(message.body); + await Promise.reject('some-error'); + }); + + await Beans.get(MessageClient).publish('topic', 'A'); + await Beans.get(MessageClient).publish('topic', 'B'); + await Beans.get(MessageClient).publish('topic', 'C'); + + await waitForCondition(() => collector.length === 3); + await expect(collector).toEqual(['A', 'B', 'C']); + }); + + it('should ignore values returned by the callback', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const collector = new Array(); + Beans.get(MessageClient).onMessage('topic', message => { + collector.push(message.body); + return 'some-value'; + }); + + await Beans.get(MessageClient).publish('topic', 'A'); + await Beans.get(MessageClient).publish('topic', 'B'); + await Beans.get(MessageClient).publish('topic', 'C'); + + await waitForCondition(() => collector.length === 3); + await expect(collector).toEqual(['A', 'B', 'C']); + }); + + it('should ignore async values returned by the callback', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const collector = new Array(); + Beans.get(MessageClient).onMessage('topic', message => { + collector.push(message.body); + return Promise.resolve('some-value'); + }); + + await Beans.get(MessageClient).publish('topic', 'A'); + await Beans.get(MessageClient).publish('topic', 'B'); + await Beans.get(MessageClient).publish('topic', 'C'); + + await waitForCondition(() => collector.length === 3); + await expect(collector).toEqual(['A', 'B', 'C']); + }); + + it('should unregister the handler when cancelling its subscription', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const collector = new Array(); + const subscription = Beans.get(MessageClient).onMessage('topic', message => { + collector.push(message.body); + return Promise.resolve('some-value'); + }); + + await Beans.get(MessageClient).publish('topic', 'A'); + await waitForCondition(() => collector.length === 1); + await expect(collector).toEqual(['A']); + + subscription.unsubscribe(); + await Beans.get(MessageClient).publish('topic', 'B'); + await waitFor(1000); + await expect(collector).toEqual(['A']); + }); + }); + + describe('request/response', () => { + + it('should reply with a single response and then complete the requestor\'s Observable', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(MessageClient).onMessage('topic', message => { + return message.body.toUpperCase(); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.getValues()).toEqual(['A']); + await expect(captor.hasCompleted()).toBeTrue(); + }); + + it('should reply with a single response (Promise) and then complete the requestor\'s Observable', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(MessageClient).onMessage('topic', message => { + return Promise.resolve(message.body.toUpperCase()); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.getValues()).toEqual(['A']); + await expect(captor.hasCompleted()).toBeTrue(); + }); + + it('should reply with a single response (Observable) and then complete the requestor\'s Observable', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(MessageClient).onMessage('topic', message => { + return of(message.body.toUpperCase()); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.getValues()).toEqual(['A']); + await expect(captor.hasCompleted()).toBeTrue(); + }); + + it('should reply with multiple responses and then complete the requestor\'s Observable', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(MessageClient).onMessage('topic', message => { + const body = message.body.toUpperCase(); + return of(`${body}-1`, `${body}-2`, `${body}-3`); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.getValues()).toEqual(['A-1', 'A-2', 'A-3']); + await expect(captor.hasCompleted()).toBeTrue(); + }); + + it('should reply with multiple responses without completing the requestor\'s Observable', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(MessageClient).onMessage('topic', message => { + const body = message.body.toUpperCase(); + const subject = new ReplaySubject(); + subject.next(`${body}-1`); + subject.next(`${body}-2`); + subject.next(`${body}-3`); + return subject; + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'a').subscribe(captor); + + await captor.waitUntilEmitCount(3); + await expect(captor.getValues()).toEqual(['A-1', 'A-2', 'A-3']); + await waitFor(1000); + await expect(captor.hasCompleted()).toBeFalse(); + await expect(captor.hasErrored()).toBeFalse(); + await expect(captor.getValues()).toEqual(['A-1', 'A-2', 'A-3']); + }); + + it('should immediately complete the requestor\'s Observable when not returning a value', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(MessageClient).onMessage('topic', () => { + // not returning a value + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.getValues()).toEqual([]); + await expect(captor.hasCompleted()).toBeTrue(); + }); + + it('should immediately complete the requestor\'s Observable when returning `undefined`', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(MessageClient).onMessage('topic', () => { + return undefined; + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.getValues()).toEqual([]); + await expect(captor.hasCompleted()).toBeTrue(); + }); + + it('should treat `null` as valid reply', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(MessageClient).onMessage('topic', () => { + return null; + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.getValues()).toEqual([null]); + await expect(captor.hasCompleted()).toBeTrue(); + }); + + it('should ignore `undefined` values, but not `null` values', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(MessageClient).onMessage('topic', message => { + const body = message.body.toUpperCase(); + return of(`${body}-1`, undefined, `${body}-2`, null, `${body}-3`); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.getValues()).toEqual(['A-1', 'A-2', null, 'A-3']); + await expect(captor.hasCompleted()).toBeTrue(); + }); + + it('should error the requestor\'s Observable when throwing an error', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(MessageClient).onMessage('topic', () => { + throw Error('some error'); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.hasErrored()).toBeTrue(); + expect(captor.getError().name).toEqual('RequestError'); + expect(captor.getError().message).toEqual('some error'); + expect(captor.getValues()).toEqual([]); + }); + + it('should error the requestor\'s Observable when returning a Promise that rejects', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(MessageClient).onMessage('topic', () => { + return Promise.reject('some error'); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.hasErrored()).toBeTrue(); + expect(captor.getError().name).toEqual('RequestError'); + expect(captor.getError().message).toEqual('some error'); + expect(captor.getValues()).toEqual([]); + }); + + it('should error the requestor\'s Observable when returning an Observable that errors', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(MessageClient).onMessage('topic', () => { + return throwError('some error'); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.hasErrored()).toBeTrue(); + expect(captor.getError().name).toEqual('RequestError'); + expect(captor.getError().message).toEqual('some error'); + expect(captor.getValues()).toEqual([]); + }); + + it('should reply values until encountering an error', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(MessageClient).onMessage('topic', message => { + return concat( + of(message.body.toUpperCase()), + throwError('some error'), + ); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.hasErrored()).toBeTrue(); + expect(captor.getError().name).toEqual('RequestError'); + expect(captor.getError().message).toEqual('some error'); + expect(captor.getValues()).toEqual(['A']); + }); + + it('should not unregister the handler if the replier Observable errors', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(MessageClient).onMessage('topic', message => { + return concat( + of(message.body.toUpperCase()), + throwError('some error'), + ); + }); + + const captor1 = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'a').subscribe(captor1); + + await captor1.waitUntilCompletedOrErrored(); + await expect(captor1.hasErrored()).toBeTrue(); + expect(captor1.getError().name).toEqual('RequestError'); + expect(captor1.getError().message).toEqual('some error'); + expect(captor1.getValues()).toEqual(['A']); + + const captor2 = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'b').subscribe(captor2); + + await captor2.waitUntilCompletedOrErrored(); + await expect(captor2.hasErrored()).toBeTrue(); + expect(captor2.getError().name).toEqual('RequestError'); + expect(captor2.getError().message).toEqual('some error'); + expect(captor2.getValues()).toEqual(['B']); + }); + + it('should not unregister the handler on error', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const collected = []; + Beans.get(MessageClient).onMessage('topic', message => { + collected.push(message.body); + throw Error('some error'); + }); + + const captor1 = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'a').subscribe(captor1); + await captor1.waitUntilCompletedOrErrored(); + await expect(captor1.hasErrored()).toBeTrue(); + expect(captor1.getError().name).toEqual('RequestError'); + expect(captor1.getError().message).toEqual('some error'); + expect(captor1.getValues()).toEqual([]); + expect(collected).toEqual(['a']); + + const captor2 = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'b').subscribe(captor2); + await captor2.waitUntilCompletedOrErrored(); + await expect(captor2.hasErrored()).toBeTrue(); + expect(captor2.getError().name).toEqual('RequestError'); + expect(captor2.getError().message).toEqual('some error'); + expect(captor2.getValues()).toEqual([]); + expect(collected).toEqual(['a', 'b']); + }); + + it('should not unregister the handler on async error', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const collected = []; + Beans.get(MessageClient).onMessage('topic', message => { + collected.push(message.body); + return Promise.reject('some error'); + }); + + const captor1 = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'a').subscribe(captor1); + await captor1.waitUntilCompletedOrErrored(); + await expect(captor1.hasErrored()).toBeTrue(); + expect(captor1.getError().name).toEqual('RequestError'); + expect(captor1.getError().message).toEqual('some error'); + expect(captor1.getValues()).toEqual([]); + expect(collected).toEqual(['a']); + + const captor2 = new ObserveCaptor(bodyExtractFn); + Beans.get(MessageClient).request$('topic', 'b').subscribe(captor2); + await captor2.waitUntilCompletedOrErrored(); + await expect(captor2.hasErrored()).toBeTrue(); + expect(captor2.getError().name).toEqual('RequestError'); + expect(captor2.getError().message).toEqual('some error'); + expect(captor2.getValues()).toEqual([]); + expect(collected).toEqual(['a', 'b']); + }); + + it('should unsubscribe from the replier Observable when the requestor unsubscribes', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const replierConstruct$ = new Subject(); + const whenReplierConstruct = replierConstruct$.toPromise(); + + const replierTeardown$ = new Subject(); + const whenReplierTeardown = replierTeardown$.toPromise(); + + const replierFinalize$ = new Subject(); + const whenReplierFinalize = replierFinalize$.toPromise(); + + Beans.get(MessageClient).onMessage('topic', () => { + return new Observable(() => { + replierConstruct$.complete(); + return () => replierTeardown$.complete(); + }) + .pipe(finalize(() => replierFinalize$.complete())); + }); + + const subscription = Beans.get(MessageClient).request$('topic').subscribe(); + await expectPromise(whenReplierConstruct).toResolve(); + subscription.unsubscribe(); + await expectPromise(whenReplierTeardown).toResolve(); + await expectPromise(whenReplierFinalize).toResolve(); + }); + + it('should unsubscribe the replier\'s and requestor\'s Observable when unregistering the handler', async () => { + const manifestUrl = serveManifest({name: 'Host App'}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const replierConstruct$ = new AsyncSubject(); + const replierTeardown$ = new AsyncSubject(); + const replierFinalize$ = new AsyncSubject(); + const requestorFinalize$ = new AsyncSubject(); + + const handlerSubscription = Beans.get(MessageClient).onMessage('topic', () => { + return new Observable(() => { + replierConstruct$.complete(); + return () => replierTeardown$.complete(); + }) + .pipe(finalize(() => replierFinalize$.complete())); + }); + + const requestorSubscription = Beans.get(MessageClient).request$('topic') + .pipe(finalize(() => requestorFinalize$.complete())) + .subscribe(); + + await expectPromise(replierConstruct$.toPromise()).toResolve(); + expect(requestorSubscription.closed).toBeFalse(); + + handlerSubscription.unsubscribe(); + await expectPromise(replierTeardown$.toPromise()).toResolve(); + await expectPromise(replierFinalize$.toPromise()).toResolve(); + await expectPromise(requestorFinalize$.toPromise()).toResolve(); + expect(requestorSubscription.closed).toBeTrue(); + }); + }); +}); + +describe('Intent Handler', () => { + + beforeEach(async () => await MicrofrontendPlatform.destroy()); + afterEach(async () => await MicrofrontendPlatform.destroy()); + + describe('pub/sub', () => { + + it('should receive intents', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const collector = new Array(); + Beans.get(IntentClient).onIntent({type: 'capability'}, intentMessage => { + collector.push(intentMessage.body); + }); + + await Beans.get(IntentClient).publish({type: 'capability'}, 'A'); + await Beans.get(IntentClient).publish({type: 'capability'}, 'B'); + await Beans.get(IntentClient).publish({type: 'capability'}, 'C'); + + await waitForCondition(() => collector.length === 3); + await expect(collector).toEqual(['A', 'B', 'C']); + }); + + it('should not unregister the callback on error', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const collector = new Array(); + Beans.get(IntentClient).onIntent({type: 'capability'}, intentMessage => { + collector.push(intentMessage.body); + throw Error('some error'); + }); + + await Beans.get(IntentClient).publish({type: 'capability'}, 'A'); + await Beans.get(IntentClient).publish({type: 'capability'}, 'B'); + await Beans.get(IntentClient).publish({type: 'capability'}, 'C'); + + await waitForCondition(() => collector.length === 3); + await expect(collector).toEqual(['A', 'B', 'C']); + }); + + it('should not unregister the callback on async error', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const collector = new Array(); + Beans.get(IntentClient).onIntent({type: 'capability'}, async intentMessage => { + collector.push(intentMessage.body); + await Promise.reject('some-error'); + }); + + await Beans.get(IntentClient).publish({type: 'capability'}, 'A'); + await Beans.get(IntentClient).publish({type: 'capability'}, 'B'); + await Beans.get(IntentClient).publish({type: 'capability'}, 'C'); + + await waitForCondition(() => collector.length === 3); + await expect(collector).toEqual(['A', 'B', 'C']); + }); + + it('should ignore values returned by the callback', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const collector = new Array(); + Beans.get(IntentClient).onIntent({type: 'capability'}, intentMessage => { + collector.push(intentMessage.body); + return 'some-value'; + }); + + await Beans.get(IntentClient).publish({type: 'capability'}, 'A'); + await Beans.get(IntentClient).publish({type: 'capability'}, 'B'); + await Beans.get(IntentClient).publish({type: 'capability'}, 'C'); + + await waitForCondition(() => collector.length === 3); + await expect(collector).toEqual(['A', 'B', 'C']); + }); + + it('should ignore async values returned by the callback', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const collector = new Array(); + Beans.get(IntentClient).onIntent({type: 'capability'}, intentMessage => { + collector.push(intentMessage.body); + return Promise.resolve('some-value'); + }); + + await Beans.get(IntentClient).publish({type: 'capability'}, 'A'); + await Beans.get(IntentClient).publish({type: 'capability'}, 'B'); + await Beans.get(IntentClient).publish({type: 'capability'}, 'C'); + + await waitForCondition(() => collector.length === 3); + await expect(collector).toEqual(['A', 'B', 'C']); + }); + + it('should unregister the handler when cancelling its subscription', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const collector = new Array(); + const subscription = Beans.get(IntentClient).onIntent({type: 'capability'}, intentMessage => { + collector.push(intentMessage.body); + return Promise.resolve('some-value'); + }); + + await Beans.get(IntentClient).publish({type: 'capability'}, 'A'); + await waitForCondition(() => collector.length === 1); + await expect(collector).toEqual(['A']); + + subscription.unsubscribe(); + await Beans.get(IntentClient).publish({type: 'capability'}, 'B'); + await waitFor(1000); + await expect(collector).toEqual(['A']); + }); + }); + + describe('request/response', () => { + + it('should reply with a single response and then complete the requestor\'s Observable', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(IntentClient).onIntent({type: 'capability'}, intentMessage => { + return intentMessage.body.toUpperCase(); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.getValues()).toEqual(['A']); + await expect(captor.hasCompleted()).toBeTrue(); + }); + + it('should reply with a single response (Promise) and then complete the requestor\'s Observable', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(IntentClient).onIntent({type: 'capability'}, intentMessage => { + return Promise.resolve(intentMessage.body.toUpperCase()); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.getValues()).toEqual(['A']); + await expect(captor.hasCompleted()).toBeTrue(); + }); + + it('should reply with a single response (Observable) and then complete the requestor\'s Observable', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(IntentClient).onIntent({type: 'capability'}, intentMessage => { + return of(intentMessage.body.toUpperCase()); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.getValues()).toEqual(['A']); + await expect(captor.hasCompleted()).toBeTrue(); + }); + + it('should reply with multiple responses and then complete the requestor\'s Observable', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(IntentClient).onIntent({type: 'capability'}, intentMessage => { + const body = intentMessage.body.toUpperCase(); + return of(`${body}-1`, `${body}-2`, `${body}-3`); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.getValues()).toEqual(['A-1', 'A-2', 'A-3']); + await expect(captor.hasCompleted()).toBeTrue(); + }); + + it('should reply with multiple responses without completing the requestor\'s Observable', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(IntentClient).onIntent({type: 'capability'}, intentMessage => { + const body = intentMessage.body.toUpperCase(); + const subject = new ReplaySubject(); + subject.next(`${body}-1`); + subject.next(`${body}-2`); + subject.next(`${body}-3`); + return subject; + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'a').subscribe(captor); + + await captor.waitUntilEmitCount(3); + await expect(captor.getValues()).toEqual(['A-1', 'A-2', 'A-3']); + await waitFor(1000); + await expect(captor.hasCompleted()).toBeFalse(); + await expect(captor.hasErrored()).toBeFalse(); + await expect(captor.getValues()).toEqual(['A-1', 'A-2', 'A-3']); + }); + + it('should immediately complete the requestor\'s Observable when not returning a value', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(IntentClient).onIntent({type: 'capability'}, () => { + // not returning a value + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.getValues()).toEqual([]); + await expect(captor.hasCompleted()).toBeTrue(); + }); + + it('should immediately complete the requestor\'s Observable when returning `undefined`', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(IntentClient).onIntent({type: 'capability'}, () => { + return undefined; + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.getValues()).toEqual([]); + await expect(captor.hasCompleted()).toBeTrue(); + }); + + it('should treat `null` as valid reply', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(IntentClient).onIntent({type: 'capability'}, () => { + return null; + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.getValues()).toEqual([null]); + await expect(captor.hasCompleted()).toBeTrue(); + }); + + it('should ignore `undefined` values, but not `null` values', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(IntentClient).onIntent({type: 'capability'}, intentMessage => { + const body = intentMessage.body.toUpperCase(); + return of(`${body}-1`, undefined, `${body}-2`, null, `${body}-3`); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.getValues()).toEqual(['A-1', 'A-2', null, 'A-3']); + await expect(captor.hasCompleted()).toBeTrue(); + }); + + it('should error the requestor\'s Observable when throwing an error', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(IntentClient).onIntent({type: 'capability'}, () => { + throw Error('some error'); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.hasErrored()).toBeTrue(); + expect(captor.getError().name).toEqual('RequestError'); + expect(captor.getError().message).toEqual('some error'); + expect(captor.getValues()).toEqual([]); + }); + + it('should error the requestor\'s Observable when returning a Promise that rejects', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(IntentClient).onIntent({type: 'capability'}, () => { + return Promise.reject('some error'); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.hasErrored()).toBeTrue(); + expect(captor.getError().name).toEqual('RequestError'); + expect(captor.getError().message).toEqual('some error'); + expect(captor.getValues()).toEqual([]); + }); + + it('should error the requestor\'s Observable when returning an Observable that errors', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(IntentClient).onIntent({type: 'capability'}, () => { + return throwError('some error'); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.hasErrored()).toBeTrue(); + expect(captor.getError().name).toEqual('RequestError'); + expect(captor.getError().message).toEqual('some error'); + expect(captor.getValues()).toEqual([]); + }); + + it('should reply values until encountering an error', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(IntentClient).onIntent({type: 'capability'}, intentMessage => { + return concat( + of(intentMessage.body.toUpperCase()), + throwError('some error'), + ); + }); + + const captor = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'a').subscribe(captor); + + await captor.waitUntilCompletedOrErrored(); + await expect(captor.hasErrored()).toBeTrue(); + expect(captor.getError().name).toEqual('RequestError'); + expect(captor.getError().message).toEqual('some error'); + expect(captor.getValues()).toEqual(['A']); + }); + + it('should not unregister the handler if the replier Observable errors', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + Beans.get(IntentClient).onIntent({type: 'capability'}, intentMessage => { + return concat( + of(intentMessage.body.toUpperCase()), + throwError('some error'), + ); + }); + + const captor1 = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'a').subscribe(captor1); + + await captor1.waitUntilCompletedOrErrored(); + await expect(captor1.hasErrored()).toBeTrue(); + expect(captor1.getError().name).toEqual('RequestError'); + expect(captor1.getError().message).toEqual('some error'); + expect(captor1.getValues()).toEqual(['A']); + + const captor2 = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'b').subscribe(captor2); + + await captor2.waitUntilCompletedOrErrored(); + await expect(captor2.hasErrored()).toBeTrue(); + expect(captor2.getError().name).toEqual('RequestError'); + expect(captor2.getError().message).toEqual('some error'); + expect(captor2.getValues()).toEqual(['B']); + }); + + it('should not unregister the handler on error', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const collected = []; + Beans.get(IntentClient).onIntent({type: 'capability'}, intentMessage => { + collected.push(intentMessage.body); + throw Error('some error'); + }); + + const captor1 = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'a').subscribe(captor1); + await captor1.waitUntilCompletedOrErrored(); + await expect(captor1.hasErrored()).toBeTrue(); + expect(captor1.getError().name).toEqual('RequestError'); + expect(captor1.getError().message).toEqual('some error'); + expect(captor1.getValues()).toEqual([]); + expect(collected).toEqual(['a']); + + const captor2 = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'b').subscribe(captor2); + await captor2.waitUntilCompletedOrErrored(); + await expect(captor2.hasErrored()).toBeTrue(); + expect(captor2.getError().name).toEqual('RequestError'); + expect(captor2.getError().message).toEqual('some error'); + expect(captor2.getValues()).toEqual([]); + expect(collected).toEqual(['a', 'b']); + }); + + it('should not unregister the handler on async error', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const collected = []; + Beans.get(IntentClient).onIntent({type: 'capability'}, intentMessage => { + collected.push(intentMessage.body); + return Promise.reject('some error'); + }); + + const captor1 = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'a').subscribe(captor1); + await captor1.waitUntilCompletedOrErrored(); + await expect(captor1.hasErrored()).toBeTrue(); + expect(captor1.getError().name).toEqual('RequestError'); + expect(captor1.getError().message).toEqual('some error'); + expect(captor1.getValues()).toEqual([]); + expect(collected).toEqual(['a']); + + const captor2 = new ObserveCaptor(bodyExtractFn); + Beans.get(IntentClient).request$({type: 'capability'}, 'b').subscribe(captor2); + await captor2.waitUntilCompletedOrErrored(); + await expect(captor2.hasErrored()).toBeTrue(); + expect(captor2.getError().name).toEqual('RequestError'); + expect(captor2.getError().message).toEqual('some error'); + expect(captor2.getValues()).toEqual([]); + expect(collected).toEqual(['a', 'b']); + }); + + it('should unsubscribe from the replier Observable when the requestor unsubscribes', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const replierConstruct$ = new Subject(); + const whenReplierConstruct = replierConstruct$.toPromise(); + + const replierTeardown$ = new Subject(); + const whenReplierTeardown = replierTeardown$.toPromise(); + + const replierFinalize$ = new Subject(); + const whenReplierFinalize = replierFinalize$.toPromise(); + + Beans.get(IntentClient).onIntent({type: 'capability'}, () => { + return new Observable(() => { + replierConstruct$.complete(); + return () => replierTeardown$.complete(); + }) + .pipe(finalize(() => replierFinalize$.complete())); + }); + + const subscription = Beans.get(IntentClient).request$({type: 'capability'}).subscribe(); + await expectPromise(whenReplierConstruct).toResolve(); + subscription.unsubscribe(); + await expectPromise(whenReplierTeardown).toResolve(); + await expectPromise(whenReplierFinalize).toResolve(); + }); + + it('should unsubscribe the replier\'s and requestor\'s Observable when unregistering the handler', async () => { + const manifestUrl = serveManifest({name: 'Host App', capabilities: [{type: 'capability'}]}); + const registeredApps: ApplicationConfig[] = [{symbolicName: 'host-app', manifestUrl: manifestUrl}]; + await MicrofrontendPlatform.startHost(registeredApps, {symbolicName: 'host-app'}); + + const replierConstruct$ = new AsyncSubject(); + const replierTeardown$ = new AsyncSubject(); + const replierFinalize$ = new AsyncSubject(); + const requestorFinalize$ = new AsyncSubject(); + + const handlerSubscription = Beans.get(IntentClient).onIntent({type: 'capability'}, () => { + return new Observable(() => { + replierConstruct$.complete(); + return () => replierTeardown$.complete(); + }) + .pipe(finalize(() => replierFinalize$.complete())); + }); + + const requestorSubscription = Beans.get(IntentClient).request$({type: 'capability'}) + .pipe(finalize(() => requestorFinalize$.complete())) + .subscribe(); + + await expectPromise(replierConstruct$.toPromise()).toResolve(); + expect(requestorSubscription.closed).toBeFalse(); + + handlerSubscription.unsubscribe(); + await expectPromise(replierTeardown$.toPromise()).toResolve(); + await expectPromise(replierFinalize$.toPromise()).toResolve(); + await expectPromise(requestorFinalize$.toPromise()).toResolve(); + expect(requestorSubscription.closed).toBeTrue(); + }); + }); +}); diff --git a/projects/scion/microfrontend-platform/src/lib/client/messaging/message-handler.ts b/projects/scion/microfrontend-platform/src/lib/client/messaging/message-handler.ts new file mode 100644 index 00000000..319fa0fb --- /dev/null +++ b/projects/scion/microfrontend-platform/src/lib/client/messaging/message-handler.ts @@ -0,0 +1,101 @@ +import { MessageClient, takeUntilUnsubscribe } from './message-client'; +import { Message, MessageHeaders, ResponseStatusCodes } from '../../messaging.model'; +import { Beans } from '@scion/toolkit/bean-manager'; +import { Observable, Subscription, throwError } from 'rxjs'; +import { Observables } from '@scion/toolkit/util'; +import { runSafe } from '../../safe-runner'; +import { stringifyError } from '../../error.util'; +import { filter, finalize } from 'rxjs/operators'; + +/** + * Subscribes to messages, passing each message to the callback. + * + * The callback can return a response to be transported to the requestor, if any. When the final response is produced, + * the handler terminates the communication, completing the requestor's Observable. If the callback errors, the error is + * transported to the requestor, erroring the requestor's Observable. + * + * @ignore + */ +export class MessageHandler { + + private _messageClient = Beans.get(MessageClient); + + /** + * Represents this handler's subscription for receiving messages. Calling {@link Subscription.unsubscribe} will also complete + * the Observable of all requestors, if any. + */ + public readonly subscription = new Subscription(); + + constructor(message$: Observable, private _callback: (message: Message) => Observable | Promise | OUT | void) { + this.subscription.add(message$.subscribe(message => { + if (message.headers.has(MessageHeaders.ReplyTo)) { + this.handleMessage(message); + } + else { + this.consumeMessage(message); + } + })); + } + + /** + * The requestor has initiated a fire-and-forget communication, thus we simply pass the message to the callback and ignore response(s). + */ + private consumeMessage(message: Message): void { + runSafe(() => this._callback(message)); + } + + /** + * The requestor has initiated a request-response communication, thus we pass the request to the callback and send response(s) + * or a potential completion or error to the requestor. + */ + private handleMessage(request: Message): void { + const replyTo = request.headers.get(MessageHeaders.ReplyTo); + + // Invoke the callback to produce value(s). + let reply: Observable | Promise | OUT | void; + try { + reply = this._callback(request); + } + catch (error) { + reply = throwError(error); + } + + // Send response(s) or a potential completion or error back to the requestor. + let observableStatus: 'alive' | 'completed' | 'errored' = 'alive'; + this.subscription.add(Observables.coerce(reply) + .pipe( + filter(next => next !== undefined), // filter `undefined` responses, e.g., returned by void callbacks. + takeUntilUnsubscribe(replyTo), // unsubscribe once the requestor terminates the communication + finalize(() => { + // Note that the `finalize` operator is also called when unsubscribing from the observable, e.g. when unsubscribing + // from the handler. If the observable errors or completes, the `finalize` operator is guaranteed to be called after + // the observer's `complete` or `error` methods; thus, the variable `observableStatus` is only `alive` when + // unsubscribing from the observable. + if (observableStatus === 'alive') { + // Terminate the communication when the handler is being unsubscribed. + const replyHeaders = new Map().set(MessageHeaders.Status, ResponseStatusCodes.TERMINAL); + this._messageClient.publish(replyTo, undefined, {headers: replyHeaders}).then(); + } + }), + ) + .subscribe( + next => { + // Transport the value to the requestor. + const replyHeaders = new Map().set(MessageHeaders.Status, ResponseStatusCodes.OK); + this._messageClient.publish(replyTo, next, {headers: replyHeaders}).then(); + }, + error => { + observableStatus = 'errored'; + // Transport the error to the requestor. + const replyHeaders = new Map().set(MessageHeaders.Status, ResponseStatusCodes.ERROR); + this._messageClient.publish(replyTo, stringifyError(error), {headers: replyHeaders}).then(); + }, + () => { + observableStatus = 'completed'; + // Terminate the communication when finished producing responses. + const replyHeaders = new Map().set(MessageHeaders.Status, ResponseStatusCodes.TERMINAL); + this._messageClient.publish(replyTo, undefined, {headers: replyHeaders}).then(); + }, + )); + } +} diff --git "a/projects/scion/microfrontend-platform/src/lib/client/messaging/\311\265intent-client.ts" "b/projects/scion/microfrontend-platform/src/lib/client/messaging/\311\265intent-client.ts" new file mode 100644 index 00000000..84e86797 --- /dev/null +++ "b/projects/scion/microfrontend-platform/src/lib/client/messaging/\311\265intent-client.ts" @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2018-2020 Swiss Federal Railways + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + */ + +import { defer, Observable, Subscription } from 'rxjs'; +import { Intent, IntentMessage, throwOnErrorStatus, TopicMessage } from '../../messaging.model'; +import { Qualifier } from '../../platform.model'; +import { BrokerGateway } from './broker-gateway'; +import { MessagingChannel } from '../../ɵmessaging.model'; +import { filterByChannel, pluckMessage } from '../../operators'; +import { filter } from 'rxjs/operators'; +import { matchesIntentQualifier } from '../../qualifier-tester'; +import { IntentClient, IntentOptions, IntentSelector } from './intent-client'; +import { Beans } from '@scion/toolkit/bean-manager'; +import { MessageHandler } from './message-handler'; + +export class ɵIntentClient implements IntentClient { // tslint:disable-line:class-name + + constructor(private readonly _brokerGateway: BrokerGateway) { + } + + public publish(intent: Intent, body?: T, options?: IntentOptions): Promise { + assertIntentQualifier(intent.qualifier, {allowWildcards: false}); + const headers = new Map(options && options.headers); + const intentMessage: IntentMessage = {intent, headers: new Map(headers)}; + setBodyIfDefined(intentMessage, body); + return this._brokerGateway.postMessage(MessagingChannel.Intent, intentMessage); + } + + public request$(intent: Intent, body?: any, options?: IntentOptions): Observable> { + assertIntentQualifier(intent.qualifier, {allowWildcards: false}); + // IMPORTANT: + // When sending a request, the platform adds various headers to the message. Therefore, to support multiple subscriptions + // to the returned Observable, each subscription must have its individual message instance and headers map. + // In addition, the headers are copied to prevent modifications before the effective subscription. + const headers = new Map(options && options.headers); + return defer(() => { + const intentMessage: IntentMessage = {intent, headers: new Map(headers)}; + setBodyIfDefined(intentMessage, body); + return this._brokerGateway.requestReply$(MessagingChannel.Intent, intentMessage).pipe(throwOnErrorStatus()); + }); + } + + public observe$(selector?: IntentSelector): Observable> { + return this._brokerGateway.message$ + .pipe( + filterByChannel>(MessagingChannel.Intent), + pluckMessage(), + filter(message => !selector || !selector.type || selector.type === message.intent.type), + filter(message => !selector || !selector.qualifier || matchesIntentQualifier(selector.qualifier, message.intent.qualifier)), + ); + } + + public onIntent(selector: IntentSelector, callback: (intentMessage: IntentMessage) => Observable | Promise | OUT | void): Subscription { + return new MessageHandler(Beans.get(IntentClient).observe$(selector), callback).subscription; + } +} + +function assertIntentQualifier(qualifier: Qualifier, options: { allowWildcards: boolean }): void { + if (!qualifier || Object.keys(qualifier).length === 0) { + return; + } + + if (!options.allowWildcards && Object.entries(qualifier).some(([key, value]) => key === '*' || value === '*' || value === '?')) { + throw Error(`[IllegalQualifierError] Qualifier must not contain wildcards. [qualifier='${JSON.stringify(qualifier)}']`); + } +} + +function setBodyIfDefined(message: TopicMessage | IntentMessage, body?: T): void { + if (body !== undefined) { + message.body = body; + } +} diff --git "a/projects/scion/microfrontend-platform/src/lib/client/messaging/\311\265message-client.ts" "b/projects/scion/microfrontend-platform/src/lib/client/messaging/\311\265message-client.ts" new file mode 100644 index 00000000..2c647626 --- /dev/null +++ "b/projects/scion/microfrontend-platform/src/lib/client/messaging/\311\265message-client.ts" @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2018-2020 Swiss Federal Railways + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + */ +import { defer, Observable, Subscription } from 'rxjs'; +import { IntentMessage, mapToBody, throwOnErrorStatus, TopicMessage } from '../../messaging.model'; +import { BrokerGateway } from './broker-gateway'; +import { Defined } from '@scion/toolkit/util'; +import { MessagingChannel, PlatformTopics } from '../../ɵmessaging.model'; +import { TopicMatcher } from '../../topic-matcher.util'; +import { MessageClient, PublishOptions, RequestOptions } from './message-client'; +import { Beans } from '@scion/toolkit/bean-manager'; +import { MessageHandler } from './message-handler'; + +export class ɵMessageClient implements MessageClient { // tslint:disable-line:class-name + + constructor(private readonly _brokerGateway: BrokerGateway) { + } + + public publish(topic: string, message?: T, options?: PublishOptions): Promise { + assertTopic(topic, {allowWildcardSegments: false}); + const headers = new Map(options && options.headers); + const topicMessage: TopicMessage = {topic, retain: Defined.orElse(options && options.retain, false), headers: new Map(headers)}; + setBodyIfDefined(topicMessage, message); + return this._brokerGateway.postMessage(MessagingChannel.Topic, topicMessage); + } + + public request$(topic: string, request?: any, options?: RequestOptions): Observable> { + assertTopic(topic, {allowWildcardSegments: false}); + // IMPORTANT: + // When sending a request, the platform adds various headers to the message. Therefore, to support multiple subscriptions + // to the returned Observable, each subscription must have its individual message instance and headers map. + // In addition, the headers are copied to prevent modifications before the effective subscription. + const headers = new Map(options && options.headers); + return defer(() => { + const topicMessage: TopicMessage = {topic, retain: false, headers: new Map(headers)}; + setBodyIfDefined(topicMessage, request); + return this._brokerGateway.requestReply$(MessagingChannel.Topic, topicMessage).pipe(throwOnErrorStatus()); + }); + } + + public observe$(topic: string): Observable> { + assertTopic(topic, {allowWildcardSegments: true}); + return this._brokerGateway.subscribeToTopic(topic); + } + + public onMessage(topic: string, callback: (message: TopicMessage) => Observable | Promise | OUT | void): Subscription { + return new MessageHandler(Beans.get(MessageClient).observe$(topic), callback).subscription; + } + + public subscriberCount$(topic: string): Observable { + assertTopic(topic, {allowWildcardSegments: false}); + return this.request$(PlatformTopics.RequestSubscriberCount, topic).pipe(mapToBody()); + } +} + +function assertTopic(topic: string, options: { allowWildcardSegments: boolean }): void { + if (topic === undefined || topic === null || topic.length === 0) { + throw Error('[IllegalTopicError] Topic must not be `null`, `undefined` or empty'); + } + + if (!options.allowWildcardSegments && TopicMatcher.containsWildcardSegments(topic)) { + throw Error(`[IllegalTopicError] Topic not allowed to contain wildcard segments. [topic='${topic}']`); + } +} + +function setBodyIfDefined(message: TopicMessage | IntentMessage, body?: T): void { + if (body !== undefined) { + message.body = body; + } +} diff --git a/projects/scion/microfrontend-platform/src/lib/error.util.ts b/projects/scion/microfrontend-platform/src/lib/error.util.ts new file mode 100644 index 00000000..f4461dd7 --- /dev/null +++ b/projects/scion/microfrontend-platform/src/lib/error.util.ts @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2018-2020 Swiss Federal Railways + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + */ + +/** + * Returns the error message if given an error object, or the `toString` representation otherwise. + */ +export function stringifyError(error: any): string { + if (error instanceof Error) { + return error.message; + } + return error?.toString(); +} diff --git a/projects/scion/microfrontend-platform/src/lib/host/application-registry.spec.ts b/projects/scion/microfrontend-platform/src/lib/host/application-registry.spec.ts index f4ae5f00..b2da0d34 100644 --- a/projects/scion/microfrontend-platform/src/lib/host/application-registry.spec.ts +++ b/projects/scion/microfrontend-platform/src/lib/host/application-registry.spec.ts @@ -11,7 +11,7 @@ import { ApplicationRegistry } from './application-registry'; import { MicrofrontendPlatform } from '../microfrontend-platform'; import { ManifestRegistry } from './manifest-registry/manifest-registry'; import { PlatformMessageClient } from './platform-message-client'; -import { ɵMessageClient } from '../client/messaging/message-client'; +import { ɵMessageClient } from '../client/messaging/ɵmessage-client'; import { ɵManifestRegistry } from './manifest-registry/ɵmanifest-registry'; import { NullBrokerGateway } from '../client/messaging/broker-gateway'; import { Beans } from '@scion/toolkit/bean-manager'; diff --git "a/projects/scion/microfrontend-platform/src/lib/host/manifest-registry/\311\265manifest-registry.ts" "b/projects/scion/microfrontend-platform/src/lib/host/manifest-registry/\311\265manifest-registry.ts" index f68c53e5..152beaa4 100644 --- "a/projects/scion/microfrontend-platform/src/lib/host/manifest-registry/\311\265manifest-registry.ts" +++ "b/projects/scion/microfrontend-platform/src/lib/host/manifest-registry/\311\265manifest-registry.ts" @@ -23,6 +23,7 @@ import { filterArray } from '@scion/toolkit/operators'; import { ManifestRegistry } from './manifest-registry'; import { matchesIntentQualifier, matchesWildcardQualifier } from '../../qualifier-tester'; import { Beans, PreDestroy } from '@scion/toolkit/bean-manager'; +import { stringifyError } from '../../error.util'; export class ɵManifestRegistry implements ManifestRegistry, PreDestroy { // tslint:disable-line:class-name @@ -153,7 +154,7 @@ export class ɵManifestRegistry implements ManifestRegistry, PreDestroy { // tsl Beans.get(PlatformMessageClient).publish(replyTo, capabilityId, {headers: new Map().set(MessageHeaders.Status, ResponseStatusCodes.TERMINAL)}); } catch (error) { - Beans.get(PlatformMessageClient).publish(replyTo, readErrorMessage(error), {headers: new Map().set(MessageHeaders.Status, ResponseStatusCodes.ERROR)}); + Beans.get(PlatformMessageClient).publish(replyTo, stringifyError(error), {headers: new Map().set(MessageHeaders.Status, ResponseStatusCodes.ERROR)}); } })); } @@ -171,7 +172,7 @@ export class ɵManifestRegistry implements ManifestRegistry, PreDestroy { // tsl Beans.get(PlatformMessageClient).publish(replyTo, undefined, {headers: new Map().set(MessageHeaders.Status, ResponseStatusCodes.TERMINAL)}); } catch (error) { - Beans.get(PlatformMessageClient).publish(replyTo, readErrorMessage(error), {headers: new Map().set(MessageHeaders.Status, ResponseStatusCodes.ERROR)}); + Beans.get(PlatformMessageClient).publish(replyTo, stringifyError(error), {headers: new Map().set(MessageHeaders.Status, ResponseStatusCodes.ERROR)}); } })); } @@ -190,7 +191,7 @@ export class ɵManifestRegistry implements ManifestRegistry, PreDestroy { // tsl Beans.get(PlatformMessageClient).publish(replyTo, intentionId, {headers: new Map().set(MessageHeaders.Status, ResponseStatusCodes.TERMINAL)}); } catch (error) { - Beans.get(PlatformMessageClient).publish(replyTo, readErrorMessage(error), {headers: new Map().set(MessageHeaders.Status, ResponseStatusCodes.ERROR)}); + Beans.get(PlatformMessageClient).publish(replyTo, stringifyError(error), {headers: new Map().set(MessageHeaders.Status, ResponseStatusCodes.ERROR)}); } })); } @@ -209,7 +210,7 @@ export class ɵManifestRegistry implements ManifestRegistry, PreDestroy { // tsl Beans.get(PlatformMessageClient).publish(replyTo, undefined, {headers: new Map().set(MessageHeaders.Status, ResponseStatusCodes.TERMINAL)}); } catch (error) { - Beans.get(PlatformMessageClient).publish(replyTo, readErrorMessage(error), {headers: new Map().set(MessageHeaders.Status, ResponseStatusCodes.ERROR)}); + Beans.get(PlatformMessageClient).publish(replyTo, stringifyError(error), {headers: new Map().set(MessageHeaders.Status, ResponseStatusCodes.ERROR)}); } })); } @@ -277,16 +278,6 @@ export enum ManifestRegistryTopics { UnregisterIntentions = 'ɵUNREGISTER_INTENTIONS', } -/** - * Returns the error message if given an error object, or the `toString` representation otherwise. - */ -function readErrorMessage(error: any): string { - if (error instanceof Error) { - return error.message; - } - return error.toString(); -} - /** * Checks if the 'Intention Registration API' is enabled for the given app. If not, an error is thrown. */ diff --git a/projects/scion/microfrontend-platform/src/lib/microfrontend-platform.ts b/projects/scion/microfrontend-platform/src/lib/microfrontend-platform.ts index 26ba307d..22b2ed70 100644 --- a/projects/scion/microfrontend-platform/src/lib/microfrontend-platform.ts +++ b/projects/scion/microfrontend-platform/src/lib/microfrontend-platform.ts @@ -7,8 +7,8 @@ * * SPDX-License-Identifier: EPL-2.0 */ -import { MessageClient, ɵMessageClient } from './client/messaging/message-client'; -import { IntentClient, ɵIntentClient } from './client/messaging/intent-client'; +import { MessageClient } from './client/messaging/message-client'; +import { IntentClient } from './client/messaging/intent-client'; import { PlatformIntentClient } from './host/platform-intent-client'; import { ManifestRegistry } from './host/manifest-registry/manifest-registry'; import { ApplicationRegistry } from './host/application-registry'; @@ -47,6 +47,8 @@ import { ActivatorInstaller } from './host/activator/activator-installer'; import { BrokerGateway, NullBrokerGateway, ɵBrokerGateway } from './client/messaging/broker-gateway'; import { PlatformState, Runlevel } from './platform-state'; import { AbstractType, BeanInstanceConstructInstructions, Beans, Type } from '@scion/toolkit/bean-manager'; +import { ɵIntentClient } from './client/messaging/ɵintent-client'; +import { ɵMessageClient } from './client/messaging/ɵmessage-client'; window.addEventListener('beforeunload', () => MicrofrontendPlatform.destroy(), {once: true}); diff --git a/projects/scion/microfrontend-platform/src/lib/safe-runner.ts b/projects/scion/microfrontend-platform/src/lib/safe-runner.ts index 43edadb1..2407ac6c 100644 --- a/projects/scion/microfrontend-platform/src/lib/safe-runner.ts +++ b/projects/scion/microfrontend-platform/src/lib/safe-runner.ts @@ -13,14 +13,25 @@ import { Beans } from '@scion/toolkit/bean-manager'; /** * Runs the given function. Errors are caught and logged. * + * If producing a Promise, returns that Promise, but with a catch handler installed. + * * @ignore */ export function runSafe(runnable: () => T): T { + let result: T; try { - return runnable(); + result = runnable(); } catch (error) { Beans.get(Logger).error('[UnexpectedError] An unexpected error occurred.', error); return undefined; } + + if (result instanceof Promise) { + return result.catch(error => { + Beans.get(Logger).error('[UnexpectedError] An unexpected error occurred.', error); + return undefined; + }) as any; + } + return result; }