Skip to content

Commit

Permalink
fix(platform): report message rejection of asynchronous interceptors …
Browse files Browse the repository at this point in the history
…back to the sender

closes #147

BREAKING CHANGE: Fixing message rejection reporting of interceptors introduced a breaking change in the Interceptor API.

In order to report asynchronous message rejection correctly, the Interceptor API has been refactored from synchronous to asynchronous message interception. To migrate, change the return type of your interceptors from `void` to `Promise<void>` and update the method body accordingly.

### The following snippets illustrate how a migration could look like:

#### Before migration

```typescript
class MessageLoggerInterceptor implements MessageInterceptor {
  public intercept(message: TopicMessage, next: Handler<TopicMessage>): void {
    console.log(message);

    // Passes the message along the interceptor chain.
    next.handle(message);
  }
}
```

#### After migration

```typescript
class MessageLoggerInterceptor implements MessageInterceptor {
  public async intercept(message: TopicMessage, next: Handler<TopicMessage>): Promise<void> {
    console.log(message);

    // Passes the message along the interceptor chain.
    await next.handle(message);
  }
}
```
  • Loading branch information
mofogasy committed Aug 19, 2022
1 parent c62aca9 commit e22ffcc
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,36 +122,54 @@ export class PlatformInitializer implements OnDestroy {

if (queryParams.has('intercept-message:reject')) {
const interceptor = new class implements MessageInterceptor {
public intercept(message: TopicMessage, next: Handler<TopicMessage>): void {
public async intercept(message: TopicMessage, next: Handler<TopicMessage>): Promise<void> {
if (message.topic === queryParams.get('intercept-message:reject')) {
throw Error('Message rejected by interceptor');
}
next.handle(message);
await next.handle(message);
}
};
Beans.register(MessageInterceptor, {useValue: interceptor, multi: true});
}

if (queryParams.has('intercept-message:reject-async')) {
const interceptor1 = new class implements MessageInterceptor {
public async intercept(message: TopicMessage, next: Handler<TopicMessage>): Promise<void> {
return next.handle(message);
}
};
const interceptor2 = new class implements MessageInterceptor {
public intercept(message: TopicMessage, next: Handler<TopicMessage>): Promise<void> {
if (message.topic === queryParams.get('intercept-message:reject-async')) {
return Promise.reject('Message rejected (async) by interceptor');
}
return next.handle(message);
}
};
Beans.register(MessageInterceptor, {useValue: interceptor1, multi: true});
Beans.register(MessageInterceptor, {useValue: interceptor2, multi: true});
}

if (queryParams.has('intercept-message:swallow')) {
const interceptor = new class implements MessageInterceptor {
public intercept(message: TopicMessage, next: Handler<TopicMessage>): void {
public async intercept(message: TopicMessage, next: Handler<TopicMessage>): Promise<void> {
if (message.topic === queryParams.get('intercept-message:swallow')) {
return;
}
next.handle(message);
await next.handle(message);
}
};
Beans.register(MessageInterceptor, {useValue: interceptor, multi: true});
}

if (queryParams.has('intercept-message:uppercase')) {
const interceptor = new class implements MessageInterceptor {
public intercept(message: TopicMessage<string>, next: Handler<TopicMessage<string>>): void {
public async intercept(message: TopicMessage<string>, next: Handler<TopicMessage<string>>): Promise<void> {
if (message.topic === queryParams.get('intercept-message:uppercase')) {
next.handle({...message, body: message.body.toUpperCase()});
await next.handle({...message, body: message.body.toUpperCase()});
}
else {
next.handle(message);
await next.handle(message);
}
}
};
Expand All @@ -163,35 +181,35 @@ export class PlatformInitializer implements OnDestroy {
const queryParams = this._queryParams;
if (queryParams.has('intercept-intent:reject')) {
const interceptor = new class implements IntentInterceptor {
public intercept(message: IntentMessage, next: Handler<IntentMessage>): void {
public async intercept(message: IntentMessage, next: Handler<IntentMessage>): Promise<void> {
if (message.intent.type === queryParams.get('intercept-intent:reject')) {
throw Error('Intent rejected by interceptor');
}
next.handle(message);
await next.handle(message);
}
};
Beans.register(IntentInterceptor, {useValue: interceptor, multi: true});
}
if (queryParams.has('intercept-intent:swallow')) {
const interceptor = new class implements IntentInterceptor {
public intercept(message: IntentMessage, next: Handler<IntentMessage>): void {
public async intercept(message: IntentMessage, next: Handler<IntentMessage>): Promise<void> {
if (message.intent.type === queryParams.get('intercept-intent:swallow')) {
return;
}
next.handle(message);
await next.handle(message);
}
};
Beans.register(IntentInterceptor, {useValue: interceptor, multi: true});
}
// Continues the interceptor chain with the message body put into uppercase.
if (queryParams.has('intercept-intent:uppercase')) {
const interceptor = new class implements IntentInterceptor {
public intercept(message: IntentMessage<string>, next: Handler<IntentMessage<string>>): void {
public async intercept(message: IntentMessage<string>, next: Handler<IntentMessage<string>>): Promise<void> {
if (message.intent.type === queryParams.get('intercept-intent:uppercase')) {
next.handle({...message, body: message.body.toUpperCase()});
await next.handle({...message, body: message.body.toUpperCase()});
}
else {
next.handle(message);
await next.handle(message);
}
}
};
Expand All @@ -200,12 +218,12 @@ export class PlatformInitializer implements OnDestroy {
// Continues the interceptor chain with the message body replaced with the stringified capability.
if (queryParams.has('intercept-intent:capability-present')) {
const interceptor = new class implements IntentInterceptor {
public intercept(message: IntentMessage<string>, next: Handler<IntentMessage<string>>): void {
public async intercept(message: IntentMessage<string>, next: Handler<IntentMessage<string>>): Promise<void> {
if (message.intent.type === queryParams.get('intercept-intent:capability-present')) {
next.handle({...message, body: JSON.stringify(message.capability)});
await next.handle({...message, body: JSON.stringify(message.capability)});
}
else {
next.handle(message);
await next.handle(message);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ import {Beans} from '@scion/toolkit/bean-manager';
/** Message Interceptor */
class MessageLoggerInterceptor implements MessageInterceptor {

public intercept(message: TopicMessage, next: Handler<TopicMessage>): void {
public async intercept(message: TopicMessage, next: Handler<TopicMessage>): Promise<void> {
console.log(message);

// Passes the message along the interceptor chain.
next.handle(message);
await next.handle(message);
}
}

/** Intent Interceptor */
class IntentLoggerInterceptor implements IntentInterceptor {

public intercept(intent: IntentMessage<any>, next: Handler<IntentMessage>): void {
public async intercept(intent: IntentMessage<any>, next: Handler<IntentMessage>): Promise<void> {
console.log(intent);

// Passes the intent along the interceptor chain.
next.handle(intent);
await next.handle(intent);
}
}

Expand Down Expand Up @@ -65,17 +65,15 @@ import {Beans} from '@scion/toolkit/bean-manager';
this.schemaValidator = new JsonSchemaValidator(jsonSchema); // <2>
}

public intercept(message: TopicMessage, next: Handler<TopicMessage>): void {
public async intercept(message: TopicMessage, next: Handler<TopicMessage>): Promise<void> {
// Pass messages sent to other topics.
if (!this.topicMatcher.match(message.topic).matches) {
next.handle(message); // <3>
return;
return next.handle(message); // <3>
}

// Validate the payload of the message.
if (this.schemaValidator.isValid(message.body)) {
next.handle(message); // <4>
return;
return next.handle(message); // <4>
}

throw Error('Message failed schema validation'); // <5>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,12 @@ describe('Messaging', () => {
await TopicBasedMessagingSpecs.interceptMessageRejectSpec();
});

it('allows swallowing messages', async () => {
await TopicBasedMessagingSpecs.interceptMessageSwallowSpec();
it('allows rejecting messages (async)', async ({testingAppPO}) => {
await TopicBasedMessagingSpecs.interceptMessageRejectAsyncSpec(testingAppPO);
});

it('allows swallowing messages', async ({testingAppPO}) => {
await TopicBasedMessagingSpecs.interceptMessageSwallowSpec(testingAppPO);
});
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,31 @@ export namespace TopicBasedMessagingSpecs {
await expect(await receiverPO.getMessages()).toEqual([]);
}

/**
* Tests message rejection (async).
* The testing app is configured to reject messages sent to the topic 'reject-async'.
*/
export async function interceptMessageRejectAsyncSpec(testingAppPO: TestingAppPO, ): Promise<void> {
const pagePOs = await testingAppPO.navigateTo({
publisher: PublishMessagePagePO,
receiver: ReceiveMessagePagePO,
}, {queryParams: new Map().set('intercept-message:reject-async', 'reject-async')});

const receiverPO = pagePOs.get<ReceiveMessagePagePO>('receiver');
await receiverPO.selectFlavor(MessagingFlavor.Topic);
await receiverPO.enterTopic('reject-async');
await receiverPO.clickSubscribe();

const publisherPO = pagePOs.get<PublishMessagePagePO>('publisher');
await publisherPO.selectFlavor(MessagingFlavor.Topic);
await publisherPO.enterTopic('reject-async');
await publisherPO.enterMessage('payload');
await publisherPO.clickPublish();

await expect(await publisherPO.getPublishError()).toEqual('Message rejected (async) by interceptor');
await expect(await receiverPO.getMessages()).toEqual([]);
}

/**
* Tests swallowing a message.
* The testing app is configured to swallow messages sent to the topic 'swallow'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,14 +603,14 @@ describe('Messaging', () => {
it('should allow an interceptor to handle a \'request-response\' intent message if no replier is running', async () => {
// create an interceptor which handles intents of a given type and then swallows the message
const interceptor = new class implements IntentInterceptor {
public intercept(message: IntentMessage, next: Handler<IntentMessage>): void {
public async intercept(message: IntentMessage, next: Handler<IntentMessage>): Promise<void> {
if (message.intent.type === 'some-capability') {
const replyTo = message.headers.get(MessageHeaders.ReplyTo);
const body = message.body;
Beans.get(MessageClient).publish(replyTo, body.toUpperCase());
await Beans.get(MessageClient).publish(replyTo, body.toUpperCase());
}
else {
next.handle(message);
await next.handle(message);
}
}
};
Expand Down Expand Up @@ -640,14 +640,14 @@ describe('Messaging', () => {
it('should allow an interceptor to handle a \'request-response\' topic message if no replier is running', async () => {
// create an interceptor which handles messages of a given topic and then swallows the message
const interceptor = new class implements MessageInterceptor {
public intercept(message: TopicMessage, next: Handler<TopicMessage>): void {
public async intercept(message: TopicMessage, next: Handler<TopicMessage>): Promise<void> {
if (message.topic === 'some-topic') {
const replyTo = message.headers.get(MessageHeaders.ReplyTo);
const body = message.body;
Beans.get(MessageClient).publish(replyTo, body.toUpperCase());
await Beans.get(MessageClient).publish(replyTo, body.toUpperCase());
}
else {
next.handle(message);
await next.handle(message);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,13 @@ export class MessageBroker implements Initializer, PreDestroy {
filter(message => message.data.message.topic !== PlatformTopics.RequestSubscriberCount), // do not dispatch messages sent to the `RequestSubscriberCount` topic as handled separately
takeUntil(this._destroy$),
)
.subscribe((event: MessageEvent<MessageEnvelope<TopicMessage>>) => runSafe(() => {
.subscribe((event: MessageEvent<MessageEnvelope<TopicMessage>>) => runSafe(async () => {
const client = getSendingClient(event);
const topicMessage = event.data.message;
const messageId = topicMessage.headers.get(MessageHeaders.MessageId);

try {
this._messagePublisher.publish(topicMessage);
await this._messagePublisher.publish(topicMessage);
sendDeliveryStatusSuccess(client, messageId);
}
catch (error) {
Expand All @@ -341,7 +341,7 @@ export class MessageBroker implements Initializer, PreDestroy {
filterByChannel<IntentMessage>(MessagingChannel.Intent),
takeUntil(this._destroy$),
)
.subscribe((event: MessageEvent<MessageEnvelope<IntentMessage>>) => runSafe(() => {
.subscribe((event: MessageEvent<MessageEnvelope<IntentMessage>>) => runSafe(async () => {
const client = getSendingClient(event);
const intentMessage = event.data.message;
const messageId = intentMessage.headers.get(MessageHeaders.MessageId);
Expand Down Expand Up @@ -402,7 +402,7 @@ export class MessageBroker implements Initializer, PreDestroy {
}

try {
capabilities.forEach(capability => this._intentPublisher.publish({...intentMessage, capability}));
await Promise.all(capabilities.map(capability => this._intentPublisher.publish({...intentMessage, capability})));
sendDeliveryStatusSuccess(client, messageId);
}
catch (error) {
Expand Down
Loading

0 comments on commit e22ffcc

Please sign in to comment.