Skip to content

Commit

Permalink
feat(rabbitmq): added error callbacks in favor of error behaviors
Browse files Browse the repository at this point in the history
  • Loading branch information
perf2711 authored and WonderPanda committed Apr 16, 2020
1 parent bc71ffa commit 85b1b67
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 73 deletions.
20 changes: 20 additions & 0 deletions integration/rabbitmq/src/rpc/reply.error.callback.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import * as amqplib from 'amqplib';

export function ReplyErrorCallback(
channel: amqplib.Channel,
msg: amqplib.ConsumeMessage,
error: any,
) {
const { replyTo, correlationId } = msg.properties;
if (replyTo) {
if (error instanceof Error) {
error = error.message;
} else if (typeof error !== 'string') {
error = JSON.stringify(error);
}

error = Buffer.from(JSON.stringify({ status: 'error', message: error }));

channel.publish('', replyTo, error, { correlationId });
}
}
4 changes: 3 additions & 1 deletion integration/rabbitmq/src/rpc/rpc.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
import { Injectable, UseInterceptors } from '@nestjs/common';
import { TransformInterceptor } from '../transform.interceptor';
import { RpcException } from '@nestjs/microservices';
import { ReplyErrorCallback } from './reply.error.callback';

@Injectable()
export class RpcService {
Expand Down Expand Up @@ -35,7 +36,8 @@ export class RpcService {
routingKey: 'error-reply-rpc',
exchange: 'exchange1',
queue: 'error-reply-rpc',
errorBehavior: MessageHandlerErrorBehavior.REPLYERRORANDACK,
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorCallbacks: [ReplyErrorCallback],
})
errorReplyRpc(message: object) {
throw new RpcException(message);
Expand Down
103 changes: 37 additions & 66 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,18 +277,8 @@ export class AmqpConnection {
const errorBehavior =
msgOptions.errorBehavior ||
this.config.defaultSubscribeErrorBehavior;
switch (errorBehavior) {
case MessageHandlerErrorBehavior.ACK: {
channel.ack(msg);
break;
}
case MessageHandlerErrorBehavior.REQUEUE: {
channel.nack(msg, false, true);
break;
}
default:
channel.nack(msg, false, false);
}

await this.handleError(channel, msgOptions, errorBehavior, msg, e);
}
}
});
Expand Down Expand Up @@ -355,22 +345,8 @@ export class AmqpConnection {
} else {
const errorBehavior =
rpcOptions.errorBehavior || this.config.defaultRpcErrorBehavior;
switch (errorBehavior) {
case MessageHandlerErrorBehavior.ACK: {
channel.ack(msg);
break;
}
case MessageHandlerErrorBehavior.REQUEUE: {
channel.nack(msg, false, true);
break;
}
case MessageHandlerErrorBehavior.REPLYERRORANDACK: {
this.handleReplyAndAckError(channel, msg, e);
break;
}
default:
channel.nack(msg, false, false);
}

await this.handleError(channel, rpcOptions, errorBehavior, msg, e);
}
}
});
Expand Down Expand Up @@ -401,44 +377,6 @@ export class AmqpConnection {
this._channel.publish(exchange, routingKey, buffer, options);
}

private handleReplyAndAckError(
channel: amqplib.Channel,
msg: amqplib.ConsumeMessage,
error: any
) {
try {
const { replyTo, correlationId } = msg.properties;
if (replyTo) {
this.publishError('', replyTo, error, { correlationId });
channel.ack(msg);
} else {
channel.nack(msg, false, false);
}
} catch {
channel.nack(msg, false, true);
}
}

private publishError(
exchange: string,
routingKey: string,
error: any,
options?: amqplib.Options.Publish
) {
if (error instanceof Error) {
error = error.message;
} else if (typeof error !== 'string') {
error = JSON.stringify(error);
}

this.publish(
exchange,
routingKey,
{ status: 'error', message: error },
options
);
}

private handleMessage<T, U>(
handler: (
msg: T | undefined,
Expand All @@ -463,4 +401,37 @@ export class AmqpConnection {

return handler(message, msg);
}

private async handleError(
channel: amqplib.Channel,
msgOptions: MessageHandlerOptions,
errorBehavior: MessageHandlerErrorBehavior,
msg: amqplib.Message,
error: any
) {
if (msg == null) {
return;
} else {
try {
if (msgOptions.errorCallbacks) {
for (const callback of msgOptions.errorCallbacks) {
await callback(channel, msg, error);
}
}
} finally {
switch (errorBehavior) {
case MessageHandlerErrorBehavior.ACK: {
channel.ack(msg);
break;
}
case MessageHandlerErrorBehavior.REQUEUE: {
channel.nack(msg, false, true);
break;
}
default:
channel.nack(msg, false, false);
}
}
}
}
}
13 changes: 7 additions & 6 deletions packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ export enum MessageHandlerErrorBehavior {
ACK,
NACK,
REQUEUE,
/**
* If an exception occurs while handling the message, the error will be serialized and published on the `replyTo` queue.
* If `replyTo` is not provided, the message will be NACKed without requeueing.
* If publish fails, message will be NACKed and requeued.
*/
REPLYERRORANDACK,
}

export interface MessageHandlerOptions {
Expand All @@ -51,9 +45,16 @@ export interface MessageHandlerOptions {
queue?: string;
queueOptions?: QueueOptions;
errorBehavior?: MessageHandlerErrorBehavior;
errorCallbacks?: IMessageErrorCallback[];
allowNonJsonMessages?: boolean;
}

export type IMessageErrorCallback = (
channel: amqplib.Channel,
msg: amqplib.ConsumeMessage,
error: any
) => Promise<any> | any;

export interface ConnectionInitOptions {
wait?: boolean;
timeout?: number;
Expand Down

0 comments on commit 85b1b67

Please sign in to comment.