Skip to content

Commit

Permalink
feat(platform): add convenience API to reduce code required to respon…
Browse files Browse the repository at this point in the history
…d to requests

This commits adds the methods `onMessage` to the 'MessageClient` and `onIntent` to the `IntentClient`. Unlike `observe$`, messages are passed to a callback function rather than emitted from an Observable. Response(s) can be returned directly from the callback. It supports error propagation and request termination. Using this method over `observe$` significantly reduces the code required to respond to requests.

closes #43

BREAKING CHANGE: Adding the convenience API for responding to requests introduced the following breaking change for Angular projects.

> Note: The messaging protocol between the host and client HAS NOT CHANGED. Thus, you can upgrade the host and clients to the new version independently.

To migrate:
- If an Angular project, add the method `onMessage` to your NgZone message client decorator, as following:
   ```typescript
   public onMessage<IN = any, OUT = any>(topic: string, onMessage: (message: TopicMessage<IN>) => Observable<OUT> | Promise<OUT> | OUT | void): Subscription {
     return messageClient.onMessage(topic, onMessage);
   }
   ```
   See https://scion-microfrontend-platform-developer-guide.now.sh/#chapter:angular-integration-guide:preparing-messaging-for-use-with-angular for more information.

- If an Angular project, add the method `onIntent` to your NgZone intent client decorator, as following:
   ```typescript
   public onIntent<IN = any, OUT = any>(selector: IntentSelector, onIntent: (intentMessage: IntentMessage<IN>) => Observable<OUT> | Promise<OUT> | OUT | void): Subscription {
     return intentClient.onIntent(selector, onIntent);
   }
   ```
   See https://scion-microfrontend-platform-developer-guide.now.sh/#chapter:angular-integration-guide:preparing-messaging-for-use-with-angular for more information.
  • Loading branch information
danielwiehl authored and Marcarrian committed Feb 3, 2021
1 parent 77a4dd9 commit d0eeaf5
Show file tree
Hide file tree
Showing 18 changed files with 1,512 additions and 154 deletions.
12 changes: 10 additions & 2 deletions apps/microfrontend-platform-devtools/src/app/ng-zone-decorators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
*/

import { BeanDecorator } from '@scion/toolkit/bean-manager';
import { Intent, IntentClient, IntentMessage, IntentOptions, MessageClient, PublishOptions, RequestOptions, TopicMessage } from '@scion/microfrontend-platform';
import { Intent, IntentClient, IntentMessage, IntentOptions, IntentSelector, MessageClient, PublishOptions, RequestOptions, TopicMessage } from '@scion/microfrontend-platform';
import { Injectable, NgZone } from '@angular/core';
import { MonoTypeOperatorFunction, Observable, pipe } from 'rxjs';
import { MonoTypeOperatorFunction, Observable, pipe, Subscription } from 'rxjs';
import { observeInside, subscribeInside } from '@scion/toolkit/operators';

/**
Expand Down Expand Up @@ -39,6 +39,10 @@ export class NgZoneMessageClientDecorator implements BeanDecorator<MessageClient
return messageClient.observe$<T>(topic).pipe(synchronizeWithAngular(zone));
}

public onMessage<IN = any, OUT = any>(topic: string, callback: (message: TopicMessage<IN>) => Observable<OUT> | Promise<OUT> | OUT | void): Subscription {
return messageClient.onMessage(topic, callback);
}

public subscriberCount$(topic: string): Observable<number> {
return messageClient.subscriberCount$(topic).pipe(synchronizeWithAngular(zone));
}
Expand Down Expand Up @@ -70,6 +74,10 @@ export class NgZoneIntentClientDecorator implements BeanDecorator<IntentClient>
public observe$<T>(selector?: Intent): Observable<IntentMessage<T>> {
return intentClient.observe$<T>(selector).pipe(synchronizeWithAngular(zone));
}

public onIntent<IN = any, OUT = any>(selector: IntentSelector, callback: (intentMessage: IntentMessage<IN>) => Observable<OUT> | Promise<OUT> | OUT | void): Subscription {
return intentClient.onIntent(selector, callback);
}
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
*/

import { BeanDecorator } from '@scion/toolkit/bean-manager';
import { Intent, IntentClient, IntentMessage, IntentOptions, MessageClient, PublishOptions, RequestOptions, TopicMessage } from '@scion/microfrontend-platform';
import { Intent, IntentClient, IntentMessage, IntentOptions, IntentSelector, MessageClient, PublishOptions, RequestOptions, TopicMessage } from '@scion/microfrontend-platform';
import { Injectable, NgZone } from '@angular/core';
import { MonoTypeOperatorFunction, Observable, pipe } from 'rxjs';
import { MonoTypeOperatorFunction, Observable, pipe, Subscription } from 'rxjs';
import { observeInside, subscribeInside } from '@scion/toolkit/operators';

/**
Expand Down Expand Up @@ -39,6 +39,10 @@ export class NgZoneMessageClientDecorator implements BeanDecorator<MessageClient
return messageClient.observe$<T>(topic).pipe(synchronizeWithAngular(zone));
}

public onMessage<IN = any, OUT = any>(topic: string, callback: (message: TopicMessage<IN>) => Observable<OUT> | Promise<OUT> | OUT | void): Subscription {
return messageClient.onMessage(topic, callback);
}

public subscriberCount$(topic: string): Observable<number> {
return messageClient.subscriberCount$(topic).pipe(synchronizeWithAngular(zone));
}
Expand Down Expand Up @@ -70,6 +74,10 @@ export class NgZoneIntentClientDecorator implements BeanDecorator<IntentClient>
public observe$<T>(selector?: Intent): Observable<IntentMessage<T>> {
return intentClient.observe$<T>(selector).pipe(synchronizeWithAngular(zone));
}

public onIntent<IN = any, OUT = any>(selector: IntentSelector, callback: (intentMessage: IntentMessage<IN>) => Observable<OUT> | Promise<OUT> | OUT | void): Subscription {
return intentClient.onIntent(selector, callback);
}
};
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Intent, IntentClient, IntentMessage, IntentSelector, MessageClient, MessageHeaders, OutletRouter, PRIMARY_OUTLET, ResponseStatusCodes, takeUntilUnsubscribe, TopicMessage } from '@scion/microfrontend-platform';
import { Subject } from 'rxjs';
import { Beans } from '@scion/toolkit/bean-manager';
import { take } from 'rxjs/operators';
import { map, take } from 'rxjs/operators';

`
// tag::intention-declaration[]
Expand Down Expand Up @@ -121,7 +121,7 @@ import { take } from 'rxjs/operators';

{
const authService = new class {
userAccessToken$ = new Subject<string>();
public userAccessToken$ = new Subject<string>();
};

// tag::reply[]
Expand Down Expand Up @@ -155,3 +155,26 @@ import { take } from 'rxjs/operators';
});
// end::reply[]
}

{
const authService = new class {
public userAccessToken$ = new Subject<string>();
};

// tag::onIntent[]
const selector: IntentSelector = {
type: 'auth',
qualifier: {entity: 'user-access-token'},
};

// Stream data as long as the requestor is subscribed to receive replies.
Beans.get(IntentClient).onIntent(selector, intentMessage => {
return authService.userAccessToken$;
});

// Alternatively, you can complete the requestor's Observable with the first reply.
Beans.get(IntentClient).onIntent(selector, intentMessage => {
return authService.userAccessToken$.pipe(take(1));
});
// end::onIntent[]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ In this Chapter
- <<chapter:intent-based-messaging:handling-intents>>
- <<chapter:intent-based-messaging:issuing-an-intent-with-headers>>
- <<chapter:intent-based-messaging:request-response-message-exchange-pattern>>
- <<chapter:intent-based-messaging:convenience-api-for-handling-intents>>
****
'''

Expand Down Expand Up @@ -202,3 +203,15 @@ include::intent-based-communication.snippets.ts[tags=reply]

TIP: If streaming data like in the example above, the replier can use the RxJS `takeUntilUnsubscribe` operator of the platform to stop replying when the requestor unsubscribes.

[[chapter:intent-based-messaging:convenience-api-for-handling-intents]]
[discrete]
=== Convenience API for handling messages
The intent client provides the `onIntent` method as a convenience to the `observe$` method. Unlike `observe$`, intents are passed to a callback function rather than emitted from an Observable. Response(s) can be returned directly from the callback. It supports error propagation and request termination. Using this method over `observe$` significantly reduces the code required to respond to requests.

[source,typescript]
----
include::intent-based-communication.snippets.ts[tags=onIntent]
----

For each intent received, the specified callback function is called. When used in request-response communication, the callback function can return the response either directly or in the form of a Promise or Observable. Returning a Promise allows the response to be computed asynchronously, and an Observable allows to return one or more responses, e.g., for streaming data. In either case, when the final response is produced, the handler terminates the communication, completing the requestor's Observable. If the callback throws an error, or the returned Promise or Observable errors, the error is transported to the requestor, erroring the requestor's Observable.

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { MessageClient, MessageHeaders, ResponseStatusCodes, takeUntilUnsubscribe, TopicMessage } from '@scion/microfrontend-platform';
import { Subject } from 'rxjs';
import { Beans } from '@scion/toolkit/bean-manager';
import { take } from 'rxjs/operators';
import { map, take } from 'rxjs/operators';

{
// tag::publish[]
Expand Down Expand Up @@ -102,3 +102,21 @@ import { take } from 'rxjs/operators';
});
// end::reply[]
}

{
const sensor$ = new Subject<number>();

// tag::onMessage[]
const topic: string = 'myhome/livingroom/temperature';

// Stream data as long as the requestor is subscribed to receive replies.
Beans.get(MessageClient).onMessage(topic, message => {
return sensor$.pipe(map(temperature => `${temperature} °C`));
});

// Alternatively, you can complete the requestor's Observable with the first reply.
Beans.get(MessageClient).onMessage(topic, message => {
return sensor$.pipe(map(temperature => `${temperature} °C`), take(1));
});
// end::onMessage[]
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ In this Chapter
- <<chapter:topic-based-messaging:publishing-a-retained-message>>
- <<chapter:topic-based-messaging:publishing-a-message-with-headers>>
- <<chapter:topic-based-messaging:request-response-message-exchange-pattern>>
- <<chapter:topic-based-messaging:convenience-api-for-handling-messages>>
****

Expand Down Expand Up @@ -159,3 +160,16 @@ include::topic-based-communication.snippets.ts[tags=reply]

TIP: If streaming data like in the example above, the replier can use the RxJS `takeUntilUnsubscribe` operator of the platform to stop replying when the requestor unsubscribes.


[[chapter:topic-based-messaging:convenience-api-for-handling-messages]]
[discrete]
=== Convenience API for handling messages
The message client provides the `onMessage` method as a convenience to the `observe$` method. Unlike `observe$`, messages are passed to a callback function rather than emitted from an Observable. Response(s) can be returned directly from the callback. It supports error propagation and request termination. Using this method over `observe$` significantly reduces the code required to respond to requests.

[source,typescript]
----
include::topic-based-communication.snippets.ts[tags=onMessage]
----

For each message received, the specified callback function is called. When used in request-response communication, the callback function can return the response either directly or in the form of a Promise or Observable. Returning a Promise allows the response to be computed asynchronously, and an Observable allows to return one or more responses, e.g., for streaming data. In either case, when the final response is produced, the handler terminates the communication, completing the requestor's Observable. If the callback throws an error, or the returned Promise or Observable errors, the error is transported to the requestor, erroring the requestor's Observable.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Intent, IntentClient, IntentMessage, IntentOptions, MessageClient, MicrofrontendPlatform, PlatformState, PublishOptions, RequestOptions, TopicMessage } from '@scion/microfrontend-platform';
import { Intent, IntentClient, IntentMessage, IntentOptions, IntentSelector, MessageClient, MicrofrontendPlatform, PlatformState, PublishOptions, RequestOptions, TopicMessage } from '@scion/microfrontend-platform';
import { Injectable, NgZone } from '@angular/core';
import { MonoTypeOperatorFunction, Observable, pipe } from 'rxjs';
import { MonoTypeOperatorFunction, Observable, pipe, Subscription } from 'rxjs';
import { HttpPlatformConfigLoader } from './start-platform-via-initializer.snippets';
import { BeanDecorator, Beans } from '@scion/toolkit/bean-manager';
import { observeInside, subscribeInside } from '@scion/toolkit/operators';
Expand Down Expand Up @@ -33,6 +33,10 @@ export class NgZoneMessageClientDecorator implements BeanDecorator<MessageClient
return messageClient.observe$<T>(topic).pipe(synchronizeWithAngular(zone)); // <3>
}

public onMessage<IN = any, OUT = any>(topic: string, callback: (message: TopicMessage<IN>) => Observable<OUT> | Promise<OUT> | OUT | void): Subscription {
return messageClient.onMessage(topic, callback);
}

public subscriberCount$(topic: string): Observable<number> {
return messageClient.subscriberCount$(topic).pipe(synchronizeWithAngular(zone)); // <3>
}
Expand Down Expand Up @@ -67,6 +71,10 @@ export class NgZoneIntentClientDecorator implements BeanDecorator<IntentClient>
public observe$<T>(selector?: Intent): Observable<IntentMessage<T>> {
return intentClient.observe$<T>(selector).pipe(synchronizeWithAngular(zone)); // <3>
}

public onIntent<IN = any, OUT = any>(selector: IntentSelector, callback: (intentMessage: IntentMessage<IN>) => Observable<OUT> | Promise<OUT> | OUT | void): Subscription {
return intentClient.onIntent(selector, callback);
}
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,14 @@
* SPDX-License-Identifier: EPL-2.0
*/

import { defer, Observable } from 'rxjs';
import { Intent, IntentMessage, throwOnErrorStatus, TopicMessage } from '../../messaging.model';
import { Observable, Subscription } from 'rxjs';
import { Intent, IntentMessage, TopicMessage } from '../../messaging.model';
import { Qualifier } from '../../platform.model';
import { BrokerGateway } from './broker-gateway';
import { MessagingChannel } from '../../ɵmessaging.model';
import { filterByChannel, pluckMessage } from '../../operators';
import { filter } from 'rxjs/operators';
import { matchesIntentQualifier } from '../../qualifier-tester';

/**
* Intent client for sending and receiving intents between microfrontends across origins.
*
* Intent-based communication enables controlled collaboration between micro applications. It is inspired by the Android platform,
* Intent-based messaging enables controlled collaboration between micro applications, a mechanism known from Android development
* where an application can start an activity via an intent (such as sending an email).
*
* Like topic-based communication, intent-based communication implements the pub/sub (publish/subscribe) messaging pattern, but is,
Expand Down Expand Up @@ -147,47 +142,30 @@ export abstract class IntentClient {
* @return An Observable that emits intents for which this application provides a satisfying capability. It never completes.
*/
public abstract observe$<T>(selector?: IntentSelector): Observable<IntentMessage<T>>;
}

/**
* @ignore
*/
export class ɵIntentClient implements IntentClient { // tslint:disable-line:class-name

constructor(private readonly _brokerGateway: BrokerGateway) {
}

public publish<T = any>(intent: Intent, body?: T, options?: IntentOptions): Promise<void> {
assertIntentQualifier(intent.qualifier, {allowWildcards: false});
const headers = new Map(options && options.headers);
const intentMessage: IntentMessage = {intent, headers: new Map(headers)};
setBodyIfDefined(intentMessage, body);
return this._brokerGateway.postMessage(MessagingChannel.Intent, intentMessage);
}

public request$<T>(intent: Intent, body?: any, options?: IntentOptions): Observable<TopicMessage<T>> {
assertIntentQualifier(intent.qualifier, {allowWildcards: false});
// IMPORTANT:
// When sending a request, the platform adds various headers to the message. Therefore, to support multiple subscriptions
// to the returned Observable, each subscription must have its individual message instance and headers map.
// In addition, the headers are copied to prevent modifications before the effective subscription.
const headers = new Map(options && options.headers);
return defer(() => {
const intentMessage: IntentMessage = {intent, headers: new Map(headers)};
setBodyIfDefined(intentMessage, body);
return this._brokerGateway.requestReply$(MessagingChannel.Intent, intentMessage).pipe(throwOnErrorStatus());
});
}

public observe$<T>(selector?: IntentSelector): Observable<IntentMessage<T>> {
return this._brokerGateway.message$
.pipe(
filterByChannel<IntentMessage<T>>(MessagingChannel.Intent),
pluckMessage(),
filter(message => !selector || !selector.type || selector.type === message.intent.type),
filter(message => !selector || !selector.qualifier || matchesIntentQualifier(selector.qualifier, message.intent.qualifier)),
);
}
/**
* Convenience API for handling intents.
*
* Unlike `observe$`, intents are passed to a callback function rather than emitted from an Observable. Response(s) can be returned directly
* from the callback. It supports error propagation and request termination. Using this method over `observe$` significantly reduces the code
* required to respond to requests.
*
* For each intent received, the specified callback function is called. When used in request-response communication,
* the callback function can return the response either directly or in the form of a Promise or Observable. Returning a Promise
* allows the response to be computed asynchronously, and an Observable allows to return one or more responses, e.g., for
* streaming data. In either case, when the final response is produced, the handler terminates the communication, completing
* the requestor's Observable. If the callback throws an error, or the returned Promise or Observable errors, the error is
* transported to the requestor, erroring the requestor's Observable.
*
* @param selector - Allows filtering intents.
* For more information, see the API description of {@link observe$}.
* @param callback - Specifies the callback to be called for each intent. When used in request-response communication,
* the callback function can return the response either directly or in the form of a Promise or Observable. If returning
* a response in fire-and-forget communication, it is ignored. Throwing an error in the callback does not unregister the callback.
* @return Subscription to unregister the callback. Calling {@link Subscription.unsubscribe} will complete the Observable of all
* requestors, if any.
*/
public abstract onIntent<IN = any, OUT = any>(selector: IntentSelector, callback: (intentMessage: IntentMessage<IN>) => Observable<OUT> | Promise<OUT> | OUT | void): Subscription;
}

/**
Expand Down Expand Up @@ -218,19 +196,3 @@ export interface IntentSelector {
*/
qualifier?: Qualifier;
}

function assertIntentQualifier(qualifier: Qualifier, options: { allowWildcards: boolean }): void {
if (!qualifier || Object.keys(qualifier).length === 0) {
return;
}

if (!options.allowWildcards && Object.entries(qualifier).some(([key, value]) => key === '*' || value === '*' || value === '?')) {
throw Error(`[IllegalQualifierError] Qualifier must not contain wildcards. [qualifier='${JSON.stringify(qualifier)}']`);
}
}

function setBodyIfDefined<T>(message: TopicMessage<T> | IntentMessage<T>, body?: T): void {
if (body !== undefined) {
message.body = body;
}
}
Loading

0 comments on commit d0eeaf5

Please sign in to comment.