Skip to content

Commit

Permalink
feat(platform/messaging): support sending retained requests to a topic
Browse files Browse the repository at this point in the history
Previously, the platform only supported sending retained messages, but not retained requests. Unlike retained messages, retained requests do not replace previously sent retained messages or requests, and are removed when the requestor unsubscribes.
  • Loading branch information
danielwiehl authored and Marcarrian committed Nov 8, 2022
1 parent b578317 commit 1098f8f
Show file tree
Hide file tree
Showing 20 changed files with 805 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<sci-checkbox [formControlName]="REQUEST_REPLY" class="e2e-request-reply"></sci-checkbox>
</sci-form-field>

<sci-form-field label="Retain" *ngIf="isTopicFlavor() && !isRequestReply()">
<sci-form-field label="Retain" *ngIf="isTopicFlavor()">
<sci-checkbox [formControlName]="RETAIN" class="e2e-retain"></sci-checkbox>
</sci-form-field>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,17 @@ export class PublishMessageComponent implements OnDestroy {
this.publishError = null;
try {
if (requestReply) {
this._requestResponseSubscription = this._messageClient.request$(topic, message, {headers: headers})
this._requestResponseSubscription = this._messageClient.request$(topic, message, {retain: this.form.get(RETAIN).value, headers})
.pipe(finalize(() => this.markPublishing(false)))
.subscribe({
next: reply => this.replies.push(reply),
error: error => this.publishError = error,
});
}
else {
this._messageClient.publish(topic, message, {retain: this.form.get(RETAIN).value, headers: headers})
this._messageClient.publish(topic, message, {retain: this.form.get(RETAIN).value, headers})
.catch(error => {
this.publishError = error;
this.publishError = error?.message ?? `${error}`;
})
.finally(() => {
this.markPublishing(false);
Expand Down Expand Up @@ -192,7 +192,7 @@ export class PublishMessageComponent implements OnDestroy {
else {
this._intentClient.publish({type, qualifier, params}, message, {headers: headers})
.catch(error => {
this.publishError = error;
this.publishError = error?.message ?? `${error}`;
})
.finally(() => {
this.markPublishing(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ A microfrontend can receive messages published to a topic as following.
include::topic-based-messaging.snippets.ts[tags=subscribe]
----
<1> Specifies the topic which to observe.
<2> Prints the body of the received message to the console, which is `22 °C` in this example.
<2> Prints the body of the received message to the console, which is `22°C` in this example.

[[chapter:topic-based-messaging:wildcard-subscription]]
[discrete]
Expand All @@ -76,7 +76,7 @@ include::topic-based-messaging.snippets.ts[tags=subscribe-with-wildcard-segments
[[chapter:topic-based-messaging:publishing-a-retained-message]]
[discrete]
=== Publishing a Retained Message
The platform supports publishing a message as a _retained_ message. Retained messages help newly-subscribed clients to get the last message published to a topic immediately upon subscription. The broker stores one retained message per topic. To delete a retained message, send a retained message without a body to the topic. Deletion messages are not transported to subscribers.
The platform supports publishing a message as a _retained_ message. Retained messages help newly subscribed clients to get the last message published to a topic immediately upon subscription. The broker stores one retained message per topic, i.e., a later sent retained message will replace a previously sent retained message. To delete a retained message, send a retained message without payload to the topic. Deletion messages are not transported to subscribers.

The following example shows how to publish a message as a _retained_ message.

Expand Down Expand Up @@ -105,7 +105,7 @@ The recipient can then access the message headers via the `headers` property on
----
include::topic-based-messaging.snippets.ts[tags=receive-message-with-headers]
----
<1> Prints received message headers to the console: `{"sensor-type" => "analog"}`.
<1> Prints received message headers to the console: `{"authorization" => "Bearer <token>"}`.

[NOTE]
====
Expand All @@ -129,14 +129,23 @@ The communication is initiated by the publisher by sending a request (instead of
The following code snippet shows how to initiate a _request-response_ communication and receiving replies.
[source,typescript]
----
include::topic-based-messaging.snippets.ts[tags=request]
include::topic-based-messaging.snippets.ts[tags=send-request]
----
<1> Initiates a _request-response_ communication by invoking the `request$` method on the `MessageClient`.
<2> Prints the received replies to the console.

Similar to publishing a retained message, a request can also be marked as retained, instructing the broker to store it in the broker and deliver it to new subscribers, even if they subscribe after the request has been sent. Unlike retained messages, retained requests are not replaced by later retained requests or messages and remain in the broker until the requestor unsubscribes.

The following code snippet illustrates how to send a retained request.
[source,typescript]
----
include::topic-based-messaging.snippets.ts[tags=send-retained-request]
----
<1> The `retain` flag instructs the broker to retain this request until unsubscribed.

[NOTE]
====
In request-response communication, by default, the requestor’s Observable never completes. However, the replier can include a response status code in the reply’s headers, allowing to control the lifecycle of the requestor’s Observable. For example, the status code `250` `ResponseStatusCodes.TERMINAL` allows completing the requestor’s Observable after emitted the reply, or the status code `500` `ResponseStatusCodes.ERROR` to error the Observable. See the enum `ResponseStatusCodes` for available status codes.
In request-response communication, by default, the requestor’s Observable never completes. However, the replier can include the response status code in the reply’s headers, allowing to control the lifecycle of the requestor’s Observable. For example, the status code `250` `ResponseStatusCodes.TERMINAL` allows completing the requestor’s Observable after emitted the reply, or the status code `500` `ResponseStatusCodes.ERROR` to error the Observable. See the enum `ResponseStatusCodes` for available status codes.
If the replier does not complete the communication, the requestor can use the `take(1)` RxJS operator to unsubscribe upon the receipt of the first reply.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
import { MessageClient, MessageHeaders, ResponseStatusCodes, takeUntilUnsubscribe, TopicMessage } from '@scion/microfrontend-platform';
import { Subject } from 'rxjs';
import { Beans } from '@scion/toolkit/bean-manager';
import { map, take } from 'rxjs/operators';
import {MessageClient, MessageHeaders, ResponseStatusCodes, takeUntilUnsubscribe, TopicMessage} from '@scion/microfrontend-platform';
import {Subject} from 'rxjs';
import {Beans} from '@scion/toolkit/bean-manager';
import {map, take} from 'rxjs/operators';

{
// tag::publish[]
const topic: string = 'myhome/livingroom/temperature'; // <1>
const payload: any = '22 °C';
const topic = 'myhome/livingroom/temperature'; // <1>

Beans.get(MessageClient).publish(topic, payload); // <2>
Beans.get(MessageClient).publish(topic, '22°C'); // <2>
// end::publish[]
}

{
// tag::subscribe[]
const topic: string = 'myhome/livingroom/temperature'; // <1>
const topic = 'myhome/livingroom/temperature'; // <1>

Beans.get(MessageClient).observe$(topic).subscribe((message: TopicMessage) => {
console.log(message.body); // <2>
Expand All @@ -24,7 +23,7 @@ import { map, take } from 'rxjs/operators';

{
// tag::subscribe-with-wildcard-segments[]
const topic: string = 'myhome/:room/temperature'; // <1>
const topic = 'myhome/:room/temperature'; // <1>

Beans.get(MessageClient).observe$(topic).subscribe((message: TopicMessage) => {
console.log(message.params); // <2>
Expand All @@ -34,26 +33,23 @@ import { map, take } from 'rxjs/operators';

{
// tag::publish-retained-message[]
const topic: string = 'myhome/livingroom/temperature';
const payload: any = '22 °C';

Beans.get(MessageClient).publish(topic, payload, {retain: true}); // <1>
const topic = 'myhome/livingroom/temperature';
Beans.get(MessageClient).publish(topic, '22°C', {retain: true}); // <1>
// end::publish-retained-message[]
}

{
// tag::publish-message-with-headers[]
const topic: string = 'myhome/livingroom/temperature';
const payload: any = '22 °C';
const headers = new Map().set('sensor-type', 'analog'); // <1>
const topic = 'myhome/livingroom/temperature';
const headers = new Map().set('authorization', 'Bearer <token>'); // <1>

Beans.get(MessageClient).publish(topic, payload, {headers: headers});
Beans.get(MessageClient).publish(topic, '22°C', {headers: headers});
// end::publish-message-with-headers[]
}

{
// tag::receive-message-with-headers[]
const topic: string = 'myhome/livingroom/temperature';
const topic = 'myhome/livingroom/temperature';

Beans.get(MessageClient).observe$(topic).subscribe((message: TopicMessage) => {
console.log(message.headers); // <1>
Expand All @@ -62,20 +58,30 @@ import { map, take } from 'rxjs/operators';
}

{
// tag::request[]
const topic: string = 'myhome/livingroom/temperature';
// tag::send-request[]
const topic = 'myhome/livingroom/temperature';

Beans.get(MessageClient).request$(topic).subscribe(reply => { // <1>
console.log(reply.body); // <2>
});
// end::request[]
// end::send-request[]
}

{
// tag::send-retained-request[]
const topic = 'myhome/livingroom/temperature';

Beans.get(MessageClient).request$(topic, undefined, {retain: true}).subscribe(reply => { // <1>
console.log(reply.body);
});
// end::send-retained-request[]
}

{
const sensor$ = new Subject<number>();

// tag::reply[]
const topic: string = 'myhome/livingroom/temperature';
const topic = 'myhome/livingroom/temperature';

// Stream data as long as the requestor is subscribed to receive replies.
Beans.get(MessageClient).observe$(topic).subscribe((request: TopicMessage) => {
Expand All @@ -84,7 +90,7 @@ import { map, take } from 'rxjs/operators';
sensor$
.pipe(takeUntilUnsubscribe(replyTo)) // <3>
.subscribe(temperature => {
Beans.get(MessageClient).publish(replyTo, `${temperature} °C`); // <2>
Beans.get(MessageClient).publish(replyTo, `${temperature}°C`); // <2>
});
});

Expand All @@ -96,7 +102,7 @@ import { map, take } from 'rxjs/operators';
sensor$
.pipe(take(1))
.subscribe(temperature => {
Beans.get(MessageClient).publish(replyTo, `${temperature} °C`, {headers});
Beans.get(MessageClient).publish(replyTo, `${temperature}°C`, {headers});
});

});
Expand All @@ -107,16 +113,16 @@ import { map, take } from 'rxjs/operators';
const sensor$ = new Subject<number>();

// tag::onMessage[]
const topic: string = 'myhome/livingroom/temperature';
const topic = 'myhome/livingroom/temperature';

// Stream data as long as the requestor is subscribed to receive replies.
Beans.get(MessageClient).onMessage(topic, message => {
return sensor$.pipe(map(temperature => `${temperature} °C`));
return sensor$.pipe(map(temperature => `${temperature}°C`));
});

// Alternatively, you can complete the requestor's Observable with the first reply.
Beans.get(MessageClient).onMessage(topic, message => {
return sensor$.pipe(map(temperature => `${temperature} °C`), take(1));
return sensor$.pipe(map(temperature => `${temperature}°C`), take(1));
});
// end::onMessage[]
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ test.describe('Messaging', () => {
await TopicBasedMessagingSpecs.receiveRetainedMessagesSpec(testingAppPO);
});

test('allows receiving retained requests', async ({testingAppPO}) => {
await TopicBasedMessagingSpecs.receiveRetainedRequestsSpec(testingAppPO);
});

test('allows receiving messages without a payload', async ({testingAppPO}) => {
await TopicBasedMessagingSpecs.receiveMessagesWithoutPayloadSpec(testingAppPO);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ export namespace TopicBasedMessagingSpecs {
await publisherPO.toggleRequestReply(true);
await publisherPO.clickPublish();

await expect(await publisherPO.getPublishError()).toContain('[RequestReplyError]');
await expect(await publisherPO.getPublishError()).toContain('[MessagingError]');
}

/**
Expand Down Expand Up @@ -453,7 +453,7 @@ export namespace TopicBasedMessagingSpecs {
await expect(await (await receiverApp1PO.getFirstMessageOrElseReject()).getBody()).toEqual('retained message');

// test to receive retained message in app-2
let receiverApp2PO = await receiverOutletPO.enterUrl<ReceiveMessagePagePO>({useClass: ReceiveMessagePagePO, origin: TestingAppOrigins.APP_2});
const receiverApp2PO = await receiverOutletPO.enterUrl<ReceiveMessagePagePO>({useClass: ReceiveMessagePagePO, origin: TestingAppOrigins.APP_2});
await receiverApp2PO.selectFlavor(MessagingFlavor.Topic);
await receiverApp2PO.enterTopic('some-topic');
await receiverApp2PO.clickSubscribe();
Expand All @@ -468,12 +468,71 @@ export namespace TopicBasedMessagingSpecs {
await expect(receiverApp2PO.getFirstMessageOrElseReject()).rejects.toThrow(/\[NoMessageFoundError]/);

// test not to receive the retained message in app-4
receiverApp2PO = await receiverOutletPO.enterUrl<ReceiveMessagePagePO>({useClass: ReceiveMessagePagePO, origin: TestingAppOrigins.APP_4});
await receiverApp2PO.selectFlavor(MessagingFlavor.Topic);
await receiverApp2PO.enterTopic('some-topic');
await receiverApp2PO.clickSubscribe();
const receiverApp4PO = await receiverOutletPO.enterUrl<ReceiveMessagePagePO>({useClass: ReceiveMessagePagePO, origin: TestingAppOrigins.APP_4});
await receiverApp4PO.selectFlavor(MessagingFlavor.Topic);
await receiverApp4PO.enterTopic('some-topic');
await receiverApp4PO.clickSubscribe();

await expect(receiverApp2PO.getFirstMessageOrElseReject()).rejects.toThrow(/\[NoMessageFoundError]/);
await expect(receiverApp4PO.getFirstMessageOrElseReject()).rejects.toThrow(/\[NoMessageFoundError]/);
}

/**
* Tests receiving requests which are retained on the broker.
*/
export async function receiveRetainedRequestsSpec(testingAppPO: TestingAppPO): Promise<void> {
const pagePOs = await testingAppPO.navigateTo({
publisher_app2: {useClass: PublishMessagePagePO, origin: TestingAppOrigins.APP_2},
receiver: 'about:blank',
});

// publish a retained request from app-1
const publisherPO = pagePOs.get<PublishMessagePagePO>('publisher_app2');
await publisherPO.selectFlavor(MessagingFlavor.Topic);
await publisherPO.enterTopic('some-topic');
await publisherPO.toggleRetain(true);
await publisherPO.toggleRequestReply(true);
await publisherPO.enterMessage('retained request');
await publisherPO.clickPublish();

const receiverOutletPO = pagePOs.get<BrowserOutletPO>('receiver');

// test to receive retained message in app-2
const receiverPO = await receiverOutletPO.enterUrl<ReceiveMessagePagePO>({useClass: ReceiveMessagePagePO, origin: TestingAppOrigins.APP_2});
await receiverPO.selectFlavor(MessagingFlavor.Topic);
await receiverPO.enterTopic('some-topic');
await receiverPO.clickSubscribe();
const requestPO = await receiverPO.getFirstMessageOrElseReject();
const replyTo = await requestPO.getReplyTo();
await expect(await requestPO.getBody()).toEqual('retained request');
await expect(replyTo).not.toBeUndefined();

// send reply
await requestPO.clickReply();

// expect the reply to be received
const reply1PO = await publisherPO.getFirstReplyOrElseReject();
await expect(await reply1PO.getTopic()).toEqual(replyTo);
await expect(await reply1PO.getBody()).toEqual('this is a reply');
await expect(await reply1PO.getReplyTo()).toBeUndefined();

// clear the replies list
await publisherPO.clickClearReplies();
await expect(await publisherPO.getReplies()).toEqual([]);

// send another reply
await requestPO.clickReply();
const replyPO = await publisherPO.getFirstReplyOrElseReject();
await expect(await replyPO.getTopic()).toEqual(replyTo);
await expect(await replyPO.getBody()).toEqual('this is a reply');
await expect(await replyPO.getReplyTo()).toBeUndefined();

// cancel subscription of requestor
await publisherPO.clickCancel();

// expect retained request to be deleted
await receiverPO.clickUnsubscribe();
await receiverPO.clickSubscribe();
await expect(receiverPO.getFirstMessageOrElseReject()).rejects.toThrow(/\[NoMessageFoundError]/);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer {
take(1),
pluckMessage(),
timeout({first: this._messageDeliveryTimeout, with: () => throwError(() => GatewayErrors.MESSAGE_DISPATCH_ERROR(this._messageDeliveryTimeout, envelope))}),
mergeMap(statusMessage => statusMessage.body!.ok ? EMPTY : throwError(() => statusMessage.body!.details)),
mergeMap(statusMessage => statusMessage.body!.ok ? EMPTY : throwError(() => Error(statusMessage.body!.details))),
takeUntil(this._platformStopping$),
)
.subscribe({
Expand Down Expand Up @@ -338,7 +338,7 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer {
mergeMap((messageEvent: MessageEvent<MessageEnvelope<TopicMessage<ConnackMessage>>>) => {
const response: ConnackMessage | undefined = messageEvent.data.message.body;
if (response?.returnCode !== 'accepted') {
return throwError(() => `${response?.returnMessage ?? 'UNEXPECTED: Empty broker discovery response'} [code: '${response?.returnCode ?? 'n/a'}']`);
return throwError(() => Error(`${response?.returnMessage ?? 'UNEXPECTED: Empty broker discovery response'} [code: '${response?.returnCode ?? 'n/a'}']`));
}
return of<Session>({
clientId: response.clientId!,
Expand Down
Loading

0 comments on commit 1098f8f

Please sign in to comment.