Skip to content

Commit

Permalink
feat(platform): let the message/intent replier control the lifecycle …
Browse files Browse the repository at this point in the history
…of the requestor’s Observable

In request-response communication, by default, the requestor’s Observable never completes. However, the replier can include a response status code in the reply’s headers, allowing to control the lifecycle of the requestor’s Observable. For example, the status code `250` allows completing the requestor’s Observable after emitted the reply, or the status code `500` to error the Observable.

- `250 (ResponseStatusCodes.TERMINAL)`: In request-reply communication, setting this status code will complete the requestor’s Observable after emitted the reply.
- `500 (ResponseStatusCodes.ERROR)`: In request-reply communication, setting this status code will error the requestor’s Observable.

See the enum `ResponseStatusCodes` for available status codes.
See https://scion-microfrontend-platform-developer-guide.now.sh/#chapter:topic-based-messaging:request-response-message-exchange-pattern for an example.

> Note that the platform evaluates status codes only in request-response communication. They are ignored when observing topics or intents in pub/sub communication but can still be used; however, they must be handled by the application, e.g., by using the `throwOnErrorStatus` SCION RxJS operator.

BREAKING CHANGE: Enabling the message/intent replier to control the requestor’s Observable lifecycle introduced a breaking change in the host/client communication protocol.

The messaging protocol between host and client HAS CHANGED for registering/unregistering capabilities/intentions using the `ManifestService`. Therefore, you must update the host and affected clients to the new version together. The API has not changed; the breaking change only applies to the `@scion/microfrontend-platform` version.

To migrate:
- Upgrade host and clients (which use the `ManifestService`) to `@scion/[email protected]`.
- Remove the `throwOnErrorStatus` SCION RxJS operator when using `IntentClient#request$` or `MessageClient#request$` as already installed by the platform.
  • Loading branch information
danielwiehl authored and Marcarrian committed Feb 3, 2021
1 parent 4af9433 commit 77a4dd9
Show file tree
Hide file tree
Showing 11 changed files with 521 additions and 99 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Intent, IntentClient, IntentMessage, IntentSelector, MessageClient, MessageHeaders, OutletRouter, PRIMARY_OUTLET, takeUntilUnsubscribe } from '@scion/microfrontend-platform';
import { Intent, IntentClient, IntentMessage, IntentSelector, MessageClient, MessageHeaders, OutletRouter, PRIMARY_OUTLET, ResponseStatusCodes, takeUntilUnsubscribe, TopicMessage } from '@scion/microfrontend-platform';
import { Subject } from 'rxjs';
import { Beans } from '@scion/toolkit/bean-manager';
import { take } from 'rxjs/operators';

`
// tag::intention-declaration[]
Expand Down Expand Up @@ -129,13 +130,28 @@ import { Beans } from '@scion/toolkit/bean-manager';
qualifier: {entity: 'user-access-token'},
};

// Stream data as long as the requestor is subscribed to receive replies.
Beans.get(IntentClient).observe$(selector).subscribe((request: IntentMessage) => {
const replyTo = request.headers.get(MessageHeaders.ReplyTo); // <1>

authService.userAccessToken$
.pipe(takeUntilUnsubscribe(replyTo)) // <3>
.subscribe(token => {
Beans.get(MessageClient).publish(replyTo, token); // <2>
});
});

// Alternatively, you can complete the requestor's Observable with the first reply.
Beans.get(IntentClient).observe$(selector).subscribe((request: IntentMessage) => {
const replyTo = request.headers.get(MessageHeaders.ReplyTo); // <1>
const headers = new Map().set(MessageHeaders.Status, ResponseStatusCodes.TERMINAL); // <4>

authService.userAccessToken$
.pipe(take(1))
.subscribe(token => {
Beans.get(MessageClient).publish(replyTo, token, {headers});
});

});
// end::reply[]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ In this Chapter
[[chapter:intent-based-messaging:what-is-intent-based-messaging]]
[discrete]
=== What is Intent-Based Messaging?
Intent-based messaging enables controlled collaboration between micro applications. It is inspired by the Android platform where an application can start an _Activity_ via an _Intent_ (such as sending an email).
Intent-based messaging enables controlled collaboration between micro applications, a mechanism known from Android development where an application can start an _Activity_ via an _Intent_ (such as sending an email).

This kind of communication is similar to the <<chapter:topic-based-messaging,topic-based>> communication, thus implements also the publish-subscribe messaging pattern, but additionally requires the sending application to declare an intention in its manifest. Unlike topic-based communication, the message (also called the intent) is exclusively transported to micro applications that provide a fulfilling capability through their manifest.

Expand Down Expand Up @@ -176,7 +176,14 @@ include::intent-based-communication.snippets.ts[tags=request]
<1> Initiates a _request-response_ communication by invoking the `request$` method on the `IntentClient`. In this example, we request the user’s access token.
<2> Prints the received token to the console.

TIP: The `Observable` never completes. If expecting a single reply, use the `take(1)` RxJS operator to unsubscribe upon the receipt of the first reply.
[NOTE]
====
In request-response communication, by default, the requestor’s Observable never completes. However, the replier can include a response status code in the reply’s headers, allowing to control the lifecycle of the requestor’s Observable. For example, the status code `250` `ResponseStatusCodes.TERMINAL` allows completing the requestor’s Observable after emitted the reply, or the status code `500` `ResponseStatusCodes.ERROR` to error the Observable. See the enum `ResponseStatusCodes` for available status codes.
If the replier does not complete the communication, the requestor can use the `take(1)` RxJS operator to unsubscribe upon the receipt of the first reply.
Note that the platform evaluates status codes only in request-response communication. They are ignored when observing topics or intents in pub/sub communication but can still be used; however, they must be handled by the application, e.g., by using the `throwOnErrorStatus` SCION RxJS operator.
====

'''

Expand All @@ -191,6 +198,7 @@ include::intent-based-communication.snippets.ts[tags=reply]
<1> Reads the `ReplyTo` topic from the request where to send replies to.
<2> Sends the user’s access token to the requestor.
<3> Stops replying when the requestor unsubscribes.
<4> Sets a message header to immediately complete the requestor's Observable after emitted the reply.

TIP: If streaming data like in the example above, the replier can use the RxJS `takeUntilUnsubscribe` operator of the platform to stop replying when the requestor unsubscribes.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { MessageClient, MessageHeaders, takeUntilUnsubscribe, TopicMessage } from '@scion/microfrontend-platform';
import { MessageClient, MessageHeaders, ResponseStatusCodes, takeUntilUnsubscribe, TopicMessage } from '@scion/microfrontend-platform';
import { Subject } from 'rxjs';
import { Beans } from '@scion/toolkit/bean-manager';
import { take } from 'rxjs/operators';

{
// tag::publish[]
Expand Down Expand Up @@ -76,13 +77,28 @@ import { Beans } from '@scion/toolkit/bean-manager';
// tag::reply[]
const topic: string = 'myhome/livingroom/temperature';

// Stream data as long as the requestor is subscribed to receive replies.
Beans.get(MessageClient).observe$(topic).subscribe((request: TopicMessage) => {
const replyTo = request.headers.get(MessageHeaders.ReplyTo); // <1>

sensor$
.pipe(takeUntilUnsubscribe(replyTo)) // <3>
.subscribe(temperature => {
Beans.get(MessageClient).publish(replyTo, `${temperature} °C`); // <2>
});
});

// Alternatively, you can complete the requestor's Observable with the first reply.
Beans.get(MessageClient).observe$(topic).subscribe((request: TopicMessage) => {
const replyTo = request.headers.get(MessageHeaders.ReplyTo); // <1>
const headers = new Map().set(MessageHeaders.Status, ResponseStatusCodes.TERMINAL); // <4>

sensor$
.pipe(take(1))
.subscribe(temperature => {
Beans.get(MessageClient).publish(replyTo, `${temperature} °C`, {headers});
});

});
// end::reply[]
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ include::topic-based-communication.snippets.ts[tags=request]
<1> Initiates a _request-response_ communication by invoking the `request$` method on the `MessageClient`.
<2> Prints the received replies to the console.

TIP: The `Observable` never completes. If expecting a single reply, use the `take(1)` RxJS operator to unsubscribe upon the receipt of the first reply.
[NOTE]
====
In request-response communication, by default, the requestor’s Observable never completes. However, the replier can include a response status code in the reply’s headers, allowing to control the lifecycle of the requestor’s Observable. For example, the status code `250` `ResponseStatusCodes.TERMINAL` allows completing the requestor’s Observable after emitted the reply, or the status code `500` `ResponseStatusCodes.ERROR` to error the Observable. See the enum `ResponseStatusCodes` for available status codes.
If the replier does not complete the communication, the requestor can use the `take(1)` RxJS operator to unsubscribe upon the receipt of the first reply.
Note that the platform evaluates status codes only in request-response communication. They are ignored when observing topics or intents in pub/sub communication but can still be used; however, they must be handled by the application, e.g., by using the `throwOnErrorStatus` SCION RxJS operator.
====

'''

Expand All @@ -148,6 +155,7 @@ include::topic-based-communication.snippets.ts[tags=reply]
<1> Reads the `ReplyTo` topic from the request where to send replies to.
<2> Sends the temperature to the requestor.
<3> Stops replying when the requestor unsubscribes.
<4> Sets a message header to immediately complete the requestor's Observable after emitted the reply.

TIP: If streaming data like in the example above, the replier can use the RxJS `takeUntilUnsubscribe` operator of the platform to stop replying when the requestor unsubscribes.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { mergeMapTo, take, takeUntil } from 'rxjs/operators';
import { PlatformTopics } from '../../ɵmessaging.model';
import { ManifestRegistryTopics } from '../../host/manifest-registry/ɵmanifest-registry';
import { ManifestObjectFilter } from '../../host/manifest-registry/manifest-object-store';
import { mapToBody, throwOnErrorStatus } from '../../messaging.model';
import { mapToBody } from '../../messaging.model';
import { Beans, PreDestroy } from '@scion/toolkit/bean-manager';

/**
Expand Down Expand Up @@ -87,10 +87,7 @@ export class ManifestService implements PreDestroy {
*/
public lookupCapabilities$<T extends Capability>(filter?: ManifestObjectFilter): Observable<T[]> {
return this._messageClient.request$<T[]>(ManifestRegistryTopics.LookupCapabilities, filter)
.pipe(
throwOnErrorStatus(),
mapToBody(),
);
.pipe(mapToBody());
}

/**
Expand All @@ -108,10 +105,7 @@ export class ManifestService implements PreDestroy {
*/
public lookupIntentions$(filter?: ManifestObjectFilter): Observable<Intention[]> {
return this._messageClient.request$<Intention[]>(ManifestRegistryTopics.LookupIntentions, filter)
.pipe(
throwOnErrorStatus(),
mapToBody(),
);
.pipe(mapToBody());
}

/**
Expand All @@ -122,11 +116,7 @@ export class ManifestService implements PreDestroy {
*/
public registerCapability<T extends Capability>(capability: T): Promise<string> {
return this._messageClient.request$<string>(ManifestRegistryTopics.RegisterCapability, capability)
.pipe(
throwOnErrorStatus(),
take(1),
mapToBody(),
)
.pipe(mapToBody())
.toPromise();
}

Expand All @@ -145,11 +135,7 @@ export class ManifestService implements PreDestroy {
*/
public unregisterCapabilities(filter?: ManifestObjectFilter): Promise<void> {
return this._messageClient.request$<void>(ManifestRegistryTopics.UnregisterCapabilities, filter)
.pipe(
throwOnErrorStatus(),
take(1),
mergeMapTo(EMPTY),
)
.pipe(mergeMapTo(EMPTY))
.toPromise()
.then(() => Promise.resolve()); // resolve to `void`
}
Expand All @@ -164,11 +150,7 @@ export class ManifestService implements PreDestroy {
*/
public registerIntention(intention: Intention): Promise<string> {
return this._messageClient.request$<string>(ManifestRegistryTopics.RegisterIntention, intention)
.pipe(
throwOnErrorStatus(),
take(1),
mapToBody(),
)
.pipe(mapToBody())
.toPromise();
}

Expand All @@ -187,11 +169,7 @@ export class ManifestService implements PreDestroy {
*/
public unregisterIntentions(filter?: ManifestObjectFilter): Promise<void> {
return this._messageClient.request$<void>(ManifestRegistryTopics.UnregisterIntentions, filter)
.pipe(
throwOnErrorStatus(),
take(1),
mergeMapTo(EMPTY),
)
.pipe(mergeMapTo(EMPTY))
.toPromise()
.then(() => Promise.resolve()); // resolve to `void`
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
*/

import { defer, Observable } from 'rxjs';
import { Intent, IntentMessage, TopicMessage } from '../../messaging.model';
import { Intent, IntentMessage, throwOnErrorStatus, TopicMessage } from '../../messaging.model';
import { Qualifier } from '../../platform.model';
import { BrokerGateway } from './broker-gateway';
import { MessagingChannel } from '../../ɵmessaging.model';
Expand Down Expand Up @@ -78,9 +78,10 @@ export abstract class IntentClient {
* @param body - Specifies optional transfer data to be carried along with the intent.
* It can be any object which is serializable with the structured clone algorithm.
* @param options - Controls how to send the request and allows setting request headers.
* @return An Observable that emits when receiving a reply. It never completes. It throws an error if the intent
* could not be dispatched or if no replier is currently available to handle the intent. If expecting a single reply,
* use the `take(1)` operator to unsubscribe upon the receipt of the first reply.
* @return An Observable that emits when receiving a reply. It never completes unless the intent handler sets the status code {@link ResponseStatusCodes.TERMINAL}
* in the {@link MessageHeaders.Status} message header. Then, the Observable completes immediately after emitted the reply.
* The Observable errors if the intent could not be dispatched or if no replier is currently available to handle the intent. It will also error if the
* intent handler sets a status code greater than or equal to 400, e.g., {@link ResponseStatusCodes.ERROR}.
*/
public abstract request$<T>(intent: Intent, body?: any, options?: IntentOptions): Observable<TopicMessage<T>>;

Expand Down Expand Up @@ -174,7 +175,7 @@ export class ɵIntentClient implements IntentClient { // tslint:disable-line:cla
return defer(() => {
const intentMessage: IntentMessage = {intent, headers: new Map(headers)};
setBodyIfDefined(intentMessage, body);
return this._brokerGateway.requestReply$(MessagingChannel.Intent, intentMessage);
return this._brokerGateway.requestReply$(MessagingChannel.Intent, intentMessage).pipe(throwOnErrorStatus());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* SPDX-License-Identifier: EPL-2.0
*/
import { defer, MonoTypeOperatorFunction, Observable } from 'rxjs';
import { IntentMessage, mapToBody, TopicMessage } from '../../messaging.model';
import { IntentMessage, mapToBody, throwOnErrorStatus, TopicMessage } from '../../messaging.model';
import { first, takeUntil } from 'rxjs/operators';
import { BrokerGateway } from './broker-gateway';
import { Defined } from '@scion/toolkit/util';
Expand Down Expand Up @@ -74,9 +74,10 @@ export abstract class MessageClient {
* @param request - Specifies optional transfer data to be carried along with the request.
* It can be any object which is serializable with the structured clone algorithm.
* @param options - Controls how to send the request and allows setting request headers.
* @return An Observable that emits when receiving a reply. It never completes. It throws an error if the message
* could not be dispatched or if no replier is currently subscribed to the topic. If expecting a single reply,
* use the `take(1)` operator to unsubscribe upon the receipt of the first reply.
* @return An Observable that emits when receiving a reply. It never completes unless the replier sets the status code {@link ResponseStatusCodes.TERMINAL}
* in the {@link MessageHeaders.Status} message header. Then, the Observable completes immediately after emitted the reply.
* The Observable errors if the message could not be dispatched or if no replier is currently subscribed to the topic. It will also error if the
* replier sets a status code greater than or equal to 400, e.g., {@link ResponseStatusCodes.ERROR}.
*/
public abstract request$<T>(topic: string, request?: any, options?: RequestOptions): Observable<TopicMessage<T>>;

Expand Down Expand Up @@ -200,7 +201,7 @@ export class ɵMessageClient implements MessageClient { // tslint:disable-line:c
return defer(() => {
const topicMessage: TopicMessage = {topic, retain: false, headers: new Map(headers)};
setBodyIfDefined(topicMessage, request);
return this._brokerGateway.requestReply$(MessagingChannel.Topic, topicMessage);
return this._brokerGateway.requestReply$(MessagingChannel.Topic, topicMessage).pipe(throwOnErrorStatus());
});
}

Expand Down
Loading

0 comments on commit 77a4dd9

Please sign in to comment.