Skip to content

Commit

Permalink
fix(platform): report message rejection of asynchronous interceptors …
Browse files Browse the repository at this point in the history
…back to the sender

closes #147

BREAKING CHANGE: Fixing message rejection reporting of interceptors introduced a breaking change in the Interceptor API.

Note: The messaging protocol between host and client HAS NOT CHANGED.

In order to report asynchronous message rejection correctly, the Interceptor API has been refactored from synchronous to asynchronous message interception. To migrate, change the return type of your interceptors from `void` to `Promise<void>` and update the method body accordingly.

### The following snippets illustrate how a migration could look like:

#### Before migration

```typescript
class MessageLoggerInterceptor implements MessageInterceptor {
  public intercept(message: TopicMessage, next: Handler<TopicMessage>): void {
    console.log(message);

    // Passes the message along the interceptor chain.
    next.handle(message);
  }
}
```

#### After migration

```typescript
class MessageLoggerInterceptor implements MessageInterceptor {
  public intercept(message: TopicMessage, next: Handler<TopicMessage>): Promise<void> {
    console.log(message);

    // Passes the message along the interceptor chain.
    return next.handle(message);
  }
}
```
  • Loading branch information
mofogasy committed Sep 30, 2022
1 parent c62aca9 commit a0387f2
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,36 +122,54 @@ export class PlatformInitializer implements OnDestroy {

if (queryParams.has('intercept-message:reject')) {
const interceptor = new class implements MessageInterceptor {
public intercept(message: TopicMessage, next: Handler<TopicMessage>): void {
public intercept(message: TopicMessage, next: Handler<TopicMessage>): Promise<void> {
if (message.topic === queryParams.get('intercept-message:reject')) {
throw Error('Message rejected by interceptor');
}
next.handle(message);
return next.handle(message);
}
};
Beans.register(MessageInterceptor, {useValue: interceptor, multi: true});
}

if (queryParams.has('intercept-message:reject-async')) {
const interceptor1 = new class implements MessageInterceptor {
public intercept(message: TopicMessage, next: Handler<TopicMessage>): Promise<void> {
return next.handle(message);
}
};
const interceptor2 = new class implements MessageInterceptor {
public intercept(message: TopicMessage, next: Handler<TopicMessage>): Promise<void> {
if (message.topic === queryParams.get('intercept-message:reject-async')) {
return Promise.reject('Message rejected (async) by interceptor');
}
return next.handle(message);
}
};
Beans.register(MessageInterceptor, {useValue: interceptor1, multi: true});
Beans.register(MessageInterceptor, {useValue: interceptor2, multi: true});
}

if (queryParams.has('intercept-message:swallow')) {
const interceptor = new class implements MessageInterceptor {
public intercept(message: TopicMessage, next: Handler<TopicMessage>): void {
public intercept(message: TopicMessage, next: Handler<TopicMessage>): Promise<void> {
if (message.topic === queryParams.get('intercept-message:swallow')) {
return;
return Promise.resolve();
}
next.handle(message);
return next.handle(message);
}
};
Beans.register(MessageInterceptor, {useValue: interceptor, multi: true});
}

if (queryParams.has('intercept-message:uppercase')) {
const interceptor = new class implements MessageInterceptor {
public intercept(message: TopicMessage<string>, next: Handler<TopicMessage<string>>): void {
public intercept(message: TopicMessage<string>, next: Handler<TopicMessage<string>>): Promise<void> {
if (message.topic === queryParams.get('intercept-message:uppercase')) {
next.handle({...message, body: message.body.toUpperCase()});
return next.handle({...message, body: message.body.toUpperCase()});
}
else {
next.handle(message);
return next.handle(message);
}
}
};
Expand All @@ -163,35 +181,35 @@ export class PlatformInitializer implements OnDestroy {
const queryParams = this._queryParams;
if (queryParams.has('intercept-intent:reject')) {
const interceptor = new class implements IntentInterceptor {
public intercept(message: IntentMessage, next: Handler<IntentMessage>): void {
public intercept(message: IntentMessage, next: Handler<IntentMessage>): Promise<void> {
if (message.intent.type === queryParams.get('intercept-intent:reject')) {
throw Error('Intent rejected by interceptor');
}
next.handle(message);
return next.handle(message);
}
};
Beans.register(IntentInterceptor, {useValue: interceptor, multi: true});
}
if (queryParams.has('intercept-intent:swallow')) {
const interceptor = new class implements IntentInterceptor {
public intercept(message: IntentMessage, next: Handler<IntentMessage>): void {
public intercept(message: IntentMessage, next: Handler<IntentMessage>): Promise<void> {
if (message.intent.type === queryParams.get('intercept-intent:swallow')) {
return;
return Promise.resolve();
}
next.handle(message);
return next.handle(message);
}
};
Beans.register(IntentInterceptor, {useValue: interceptor, multi: true});
}
// Continues the interceptor chain with the message body put into uppercase.
if (queryParams.has('intercept-intent:uppercase')) {
const interceptor = new class implements IntentInterceptor {
public intercept(message: IntentMessage<string>, next: Handler<IntentMessage<string>>): void {
public intercept(message: IntentMessage<string>, next: Handler<IntentMessage<string>>): Promise<void> {
if (message.intent.type === queryParams.get('intercept-intent:uppercase')) {
next.handle({...message, body: message.body.toUpperCase()});
return next.handle({...message, body: message.body.toUpperCase()});
}
else {
next.handle(message);
return next.handle(message);
}
}
};
Expand All @@ -200,12 +218,12 @@ export class PlatformInitializer implements OnDestroy {
// Continues the interceptor chain with the message body replaced with the stringified capability.
if (queryParams.has('intercept-intent:capability-present')) {
const interceptor = new class implements IntentInterceptor {
public intercept(message: IntentMessage<string>, next: Handler<IntentMessage<string>>): void {
public intercept(message: IntentMessage<string>, next: Handler<IntentMessage<string>>): Promise<void> {
if (message.intent.type === queryParams.get('intercept-intent:capability-present')) {
next.handle({...message, body: JSON.stringify(message.capability)});
return next.handle({...message, body: JSON.stringify(message.capability)});
}
else {
next.handle(message);
return next.handle(message);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ import {Beans} from '@scion/toolkit/bean-manager';
/** Message Interceptor */
class MessageLoggerInterceptor implements MessageInterceptor {

public intercept(message: TopicMessage, next: Handler<TopicMessage>): void {
public intercept(message: TopicMessage, next: Handler<TopicMessage>): Promise<void> {
console.log(message);

// Passes the message along the interceptor chain.
next.handle(message);
return next.handle(message);
}
}

/** Intent Interceptor */
class IntentLoggerInterceptor implements IntentInterceptor {

public intercept(intent: IntentMessage<any>, next: Handler<IntentMessage>): void {
public intercept(intent: IntentMessage<any>, next: Handler<IntentMessage>): Promise<void> {
console.log(intent);

// Passes the intent along the interceptor chain.
next.handle(intent);
return next.handle(intent);
}
}

Expand Down Expand Up @@ -65,17 +65,15 @@ import {Beans} from '@scion/toolkit/bean-manager';
this.schemaValidator = new JsonSchemaValidator(jsonSchema); // <2>
}

public intercept(message: TopicMessage, next: Handler<TopicMessage>): void {
public intercept(message: TopicMessage, next: Handler<TopicMessage>): Promise<void> {
// Pass messages sent to other topics.
if (!this.topicMatcher.match(message.topic).matches) {
next.handle(message); // <3>
return;
return next.handle(message); // <3>
}

// Validate the payload of the message.
if (this.schemaValidator.isValid(message.body)) {
next.handle(message); // <4>
return;
return next.handle(message); // <4>
}

throw Error('Message failed schema validation'); // <5>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ describe('Messaging', () => {
await TopicBasedMessagingSpecs.interceptMessageRejectSpec();
});

it('allows rejecting messages (async)', async () => {
await TopicBasedMessagingSpecs.interceptMessageRejectAsyncSpec();
});

it('allows swallowing messages', async () => {
await TopicBasedMessagingSpecs.interceptMessageSwallowSpec();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,32 @@ export namespace TopicBasedMessagingSpecs {
await expect(await receiverPO.getMessages()).toEqual([]);
}

/**
* Tests message rejection (async).
* The testing app is configured to reject messages sent to the topic 'reject-async'.
*/
export async function interceptMessageRejectAsyncSpec(): Promise<void> {
const testingAppPO = new TestingAppPO();
const pagePOs = await testingAppPO.navigateTo({
publisher: PublishMessagePagePO,
receiver: ReceiveMessagePagePO,
}, {queryParams: new Map().set('intercept-message:reject-async', 'reject-async')});

const receiverPO = pagePOs.get<ReceiveMessagePagePO>('receiver');
await receiverPO.selectFlavor(MessagingFlavor.Topic);
await receiverPO.enterTopic('reject-async');
await receiverPO.clickSubscribe();

const publisherPO = pagePOs.get<PublishMessagePagePO>('publisher');
await publisherPO.selectFlavor(MessagingFlavor.Topic);
await publisherPO.enterTopic('reject-async');
await publisherPO.enterMessage('payload');
await publisherPO.clickPublish();

await expect(await publisherPO.getPublishError()).toEqual('Message rejected (async) by interceptor');
await expect(await receiverPO.getMessages()).toEqual([]);
}

/**
* Tests swallowing a message.
* The testing app is configured to swallow messages sent to the topic 'swallow'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,14 +603,14 @@ describe('Messaging', () => {
it('should allow an interceptor to handle a \'request-response\' intent message if no replier is running', async () => {
// create an interceptor which handles intents of a given type and then swallows the message
const interceptor = new class implements IntentInterceptor {
public intercept(message: IntentMessage, next: Handler<IntentMessage>): void {
public intercept(message: IntentMessage, next: Handler<IntentMessage>): Promise<void> {
if (message.intent.type === 'some-capability') {
const replyTo = message.headers.get(MessageHeaders.ReplyTo);
const body = message.body;
Beans.get(MessageClient).publish(replyTo, body.toUpperCase());
return Beans.get(MessageClient).publish(replyTo, body.toUpperCase());
}
else {
next.handle(message);
return next.handle(message);
}
}
};
Expand Down Expand Up @@ -640,14 +640,14 @@ describe('Messaging', () => {
it('should allow an interceptor to handle a \'request-response\' topic message if no replier is running', async () => {
// create an interceptor which handles messages of a given topic and then swallows the message
const interceptor = new class implements MessageInterceptor {
public intercept(message: TopicMessage, next: Handler<TopicMessage>): void {
public intercept(message: TopicMessage, next: Handler<TopicMessage>): Promise<void> {
if (message.topic === 'some-topic') {
const replyTo = message.headers.get(MessageHeaders.ReplyTo);
const body = message.body;
Beans.get(MessageClient).publish(replyTo, body.toUpperCase());
return Beans.get(MessageClient).publish(replyTo, body.toUpperCase());
}
else {
next.handle(message);
return next.handle(message);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,13 @@ export class MessageBroker implements Initializer, PreDestroy {
filter(message => message.data.message.topic !== PlatformTopics.RequestSubscriberCount), // do not dispatch messages sent to the `RequestSubscriberCount` topic as handled separately
takeUntil(this._destroy$),
)
.subscribe((event: MessageEvent<MessageEnvelope<TopicMessage>>) => runSafe(() => {
.subscribe((event: MessageEvent<MessageEnvelope<TopicMessage>>) => runSafe(async () => {
const client = getSendingClient(event);
const topicMessage = event.data.message;
const messageId = topicMessage.headers.get(MessageHeaders.MessageId);

try {
this._messagePublisher.publish(topicMessage);
await this._messagePublisher.publish(topicMessage);
sendDeliveryStatusSuccess(client, messageId);
}
catch (error) {
Expand All @@ -341,7 +341,7 @@ export class MessageBroker implements Initializer, PreDestroy {
filterByChannel<IntentMessage>(MessagingChannel.Intent),
takeUntil(this._destroy$),
)
.subscribe((event: MessageEvent<MessageEnvelope<IntentMessage>>) => runSafe(() => {
.subscribe((event: MessageEvent<MessageEnvelope<IntentMessage>>) => runSafe(async () => {
const client = getSendingClient(event);
const intentMessage = event.data.message;
const messageId = intentMessage.headers.get(MessageHeaders.MessageId);
Expand Down Expand Up @@ -402,7 +402,7 @@ export class MessageBroker implements Initializer, PreDestroy {
}

try {
capabilities.forEach(capability => this._intentPublisher.publish({...intentMessage, capability}));
await Promise.all(capabilities.map(capability => this._intentPublisher.publish({...intentMessage, capability})));
sendDeliveryStatusSuccess(client, messageId);
}
catch (error) {
Expand All @@ -415,7 +415,7 @@ export class MessageBroker implements Initializer, PreDestroy {
* Creates the interceptor chain to intercept message publishing. The publisher is added as terminal handler.
*/
private createMessagePublisher(): PublishInterceptorChain<TopicMessage> {
return chainInterceptors(Beans.all(MessageInterceptor), (message: TopicMessage): void => {
return chainInterceptors(Beans.all(MessageInterceptor), async (message: TopicMessage): Promise<void> => {
// If the message is marked as 'retained', store it, or if without a body, delete it.
if (message.retain && this._retainedMessageRegistry.persistOrDelete(message) === 'deleted') {
return; // Deletion events for retained messages are swallowed.
Expand All @@ -435,7 +435,7 @@ export class MessageBroker implements Initializer, PreDestroy {
* Creates the interceptor chain to intercept intent publishing. The publisher is added as terminal handler.
*/
private createIntentPublisher(): PublishInterceptorChain<IntentMessage> {
return chainInterceptors(Beans.all(IntentInterceptor), (message: IntentMessage): void => {
return chainInterceptors(Beans.all(IntentInterceptor), async (message: IntentMessage): Promise<void> => {
const capability = Defined.orElseThrow(message.capability, () => Error(`[IllegalStateError] Missing target capability on intent message: ${JSON.stringify(message)}`));
const clients = this._clientRegistry.getByApplication(capability.metadata!.appSymbolicName);

Expand Down
Loading

0 comments on commit a0387f2

Please sign in to comment.