diff --git a/packages/rabbitmq/src/amqp/connection.ts b/packages/rabbitmq/src/amqp/connection.ts index 2dbd89f3f..3a22b66a8 100644 --- a/packages/rabbitmq/src/amqp/connection.ts +++ b/packages/rabbitmq/src/amqp/connection.ts @@ -2,6 +2,7 @@ import * as amqplib from 'amqplib'; import { interval, race, Subject } from 'rxjs'; import { filter, first, map } from 'rxjs/operators'; import * as uuid from 'uuid'; +import * as amqpcon from 'amqp-connection-manager'; import { MessageHandlerErrorBehavior, MessageHandlerOptions, @@ -28,9 +29,10 @@ const defaultConfig = { export class AmqpConnection { private messageSubject = new Subject(); - private _connection!: amqplib.Connection; - private _channel!: amqplib.Channel; + private _connection!: amqpcon.AmqpConnectionManager; + private _channel!: amqpcon.ChannelWrapper; private config: Required; + constructor(config: RabbitMQConfig) { this.config = { ...defaultConfig, ...config }; } @@ -44,22 +46,51 @@ export class AmqpConnection { } public async init() { - this._connection = await amqplib.connect(this.config.uri); - this._channel = await this._connection.createChannel(); + const connectionManager = amqpcon.connect([this.config.uri]); + this._connection = connectionManager; + + // this._channel = this._connection.createChannel({ + // setup: this.setupInitChannel.bind(this), + // }); + this._channel = this._connection.createChannel({}); + await this._channel.addSetup(this.setupInitChannel.bind(this)); + } - await Promise.all( - this.config.exchanges.map(async x => - this._channel.assertExchange( - x.name, - x.type || this.config.defaultExchangeType, - x.options - ) + private async setupInitChannel( + channel: amqplib.ConfirmChannel + ): Promise { + this.config.exchanges.map(async x => + channel.assertExchange( + x.name, + x.type || this.config.defaultExchangeType, + x.options ) ); - await this.channel.prefetch(this.config.prefetchCount); + await channel.prefetch(this.config.prefetchCount); + await this.initDirectReplyQueue(channel); + } - await this.initDirectReplyQueue(); + private async initDirectReplyQueue(channel: amqplib.ConfirmChannel) { + // Set up a consumer on the Direct Reply-To queue to facilitate RPC functionality + await channel.consume( + DIRECT_REPLY_QUEUE, + async msg => { + if (msg == null) { + return; + } + + const correlationMessage: CorrelationMessage = { + correlationId: msg.properties.correlationId.toString(), + message: JSON.parse(msg.content.toString()) + }; + + this.messageSubject.next(correlationMessage); + }, + { + noAck: true + } + ); } public async request( @@ -99,9 +130,22 @@ export class AmqpConnection { ) => Promise, msgOptions: MessageHandlerOptions ) { + return this._channel.addSetup(channel => + this.setupSubscriberChannel(handler, msgOptions, channel) + ); + } + + private async setupSubscriberChannel( + handler: ( + msg: T, + rawMessage?: amqplib.ConsumeMessage + ) => Promise, + msgOptions: MessageHandlerOptions, + channel: amqplib.ConfirmChannel + ): Promise { const { exchange, routingKey } = msgOptions; - const { queue } = await this.channel.assertQueue( + const { queue } = await channel.assertQueue( msgOptions.queue || '', msgOptions.queueOptions || undefined ); @@ -109,10 +153,10 @@ export class AmqpConnection { const routingKeys = Array.isArray(routingKey) ? routingKey : [routingKey]; await Promise.all( - routingKeys.map(x => this.channel.bindQueue(queue, exchange, x)) + routingKeys.map(x => channel.bindQueue(queue, exchange, x)) ); - await this.channel.consume(queue, async msg => { + await channel.consume(queue, async msg => { try { if (msg == null) { throw new Error('Received null message'); @@ -121,7 +165,7 @@ export class AmqpConnection { const message = JSON.parse(msg.content.toString()) as T; const response = await handler(message, msg); if (response instanceof Nack) { - this._channel.nack(msg, false, response.requeue); + channel.nack(msg, false, response.requeue); return; } @@ -131,7 +175,7 @@ export class AmqpConnection { ); } - this._channel.ack(msg); + channel.ack(msg); } catch (e) { if (msg == null) { return; @@ -141,15 +185,15 @@ export class AmqpConnection { this.config.defaultSubscribeErrorBehavior; switch (errorBehavior) { case MessageHandlerErrorBehavior.ACK: { - this._channel.ack(msg); + channel.ack(msg); break; } case MessageHandlerErrorBehavior.REQUEUE: { - this._channel.nack(msg, false, true); + channel.nack(msg, false, true); break; } default: - this._channel.nack(msg, false, false); + channel.nack(msg, false, false); } } } @@ -162,10 +206,23 @@ export class AmqpConnection { rawMessage?: amqplib.ConsumeMessage ) => Promise>, rpcOptions: MessageHandlerOptions + ) { + return this._channel.addSetup(channel => + this.setupRpcChannel(handler, rpcOptions, channel) + ); + } + + public async setupRpcChannel( + handler: ( + msg: T, + rawMessage?: amqplib.ConsumeMessage + ) => Promise>, + rpcOptions: MessageHandlerOptions, + channel: amqplib.ConfirmChannel ) { const { exchange, routingKey } = rpcOptions; - const { queue } = await this.channel.assertQueue( + const { queue } = await channel.assertQueue( rpcOptions.queue || '', rpcOptions.queueOptions || undefined ); @@ -173,10 +230,10 @@ export class AmqpConnection { const routingKeys = Array.isArray(routingKey) ? routingKey : [routingKey]; await Promise.all( - routingKeys.map(x => this.channel.bindQueue(queue, exchange, x)) + routingKeys.map(x => channel.bindQueue(queue, exchange, x)) ); - await this.channel.consume(queue, async msg => { + await channel.consume(queue, async msg => { try { if (msg == null) { throw new Error('Received null message'); @@ -186,7 +243,7 @@ export class AmqpConnection { const response = await handler(message, msg); if (response instanceof Nack) { - this._channel.nack(msg, false, response.requeue); + channel.nack(msg, false, response.requeue); return; } @@ -194,7 +251,7 @@ export class AmqpConnection { if (replyTo) { this.publish('', replyTo, response, { correlationId }); } - this._channel.ack(msg); + channel.ack(msg); } catch (e) { if (msg == null) { return; @@ -203,15 +260,15 @@ export class AmqpConnection { rpcOptions.errorBehavior || this.config.defaultRpcErrorBehavior; switch (errorBehavior) { case MessageHandlerErrorBehavior.ACK: { - this._channel.ack(msg); + channel.ack(msg); break; } case MessageHandlerErrorBehavior.REQUEUE: { - this._channel.nack(msg, false, true); + channel.nack(msg, false, true); break; } default: - this._channel.nack(msg, false, false); + channel.nack(msg, false, false); } } } @@ -224,33 +281,11 @@ export class AmqpConnection { message: any, options?: amqplib.Options.Publish ) { - this.channel.publish( + return this.channel.publish( exchange, routingKey, Buffer.from(JSON.stringify(message)), options ); } - - private async initDirectReplyQueue() { - // Set up a consumer on the Direct Reply-To queue to facilitate RPC functionality - await this._channel.consume( - DIRECT_REPLY_QUEUE, - async msg => { - if (msg == null) { - return; - } - - const correlationMessage: CorrelationMessage = { - correlationId: msg.properties.correlationId.toString(), - message: JSON.parse(msg.content.toString()) - }; - - this.messageSubject.next(correlationMessage); - }, - { - noAck: true - } - ); - } }