Skip to content

Commit

Permalink
feat(conn-mgr): use connection manager
Browse files Browse the repository at this point in the history
  • Loading branch information
azuker authored and WonderPanda committed Jan 12, 2020
1 parent 0bf5e94 commit 9049058
Showing 1 changed file with 87 additions and 52 deletions.
139 changes: 87 additions & 52 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as amqplib from 'amqplib';
import { interval, race, Subject } from 'rxjs';
import { filter, first, map } from 'rxjs/operators';
import * as uuid from 'uuid';
import * as amqpcon from 'amqp-connection-manager';
import {
MessageHandlerErrorBehavior,
MessageHandlerOptions,
Expand All @@ -28,9 +29,10 @@ const defaultConfig = {

export class AmqpConnection {
private messageSubject = new Subject<CorrelationMessage>();
private _connection!: amqplib.Connection;
private _channel!: amqplib.Channel;
private _connection!: amqpcon.AmqpConnectionManager;
private _channel!: amqpcon.ChannelWrapper;
private config: Required<RabbitMQConfig>;

constructor(config: RabbitMQConfig) {
this.config = { ...defaultConfig, ...config };
}
Expand All @@ -44,22 +46,51 @@ export class AmqpConnection {
}

public async init() {
this._connection = await amqplib.connect(this.config.uri);
this._channel = await this._connection.createChannel();
const connectionManager = amqpcon.connect([this.config.uri]);
this._connection = connectionManager;

// this._channel = this._connection.createChannel({
// setup: this.setupInitChannel.bind(this),
// });
this._channel = this._connection.createChannel({});
await this._channel.addSetup(this.setupInitChannel.bind(this));
}

await Promise.all(
this.config.exchanges.map(async x =>
this._channel.assertExchange(
x.name,
x.type || this.config.defaultExchangeType,
x.options
)
private async setupInitChannel(
channel: amqplib.ConfirmChannel
): Promise<void> {
this.config.exchanges.map(async x =>
channel.assertExchange(
x.name,
x.type || this.config.defaultExchangeType,
x.options
)
);

await this.channel.prefetch(this.config.prefetchCount);
await channel.prefetch(this.config.prefetchCount);
await this.initDirectReplyQueue(channel);
}

await this.initDirectReplyQueue();
private async initDirectReplyQueue(channel: amqplib.ConfirmChannel) {
// Set up a consumer on the Direct Reply-To queue to facilitate RPC functionality
await channel.consume(
DIRECT_REPLY_QUEUE,
async msg => {
if (msg == null) {
return;
}

const correlationMessage: CorrelationMessage = {
correlationId: msg.properties.correlationId.toString(),
message: JSON.parse(msg.content.toString())
};

this.messageSubject.next(correlationMessage);
},
{
noAck: true
}
);
}

public async request<T extends {}>(
Expand Down Expand Up @@ -99,20 +130,33 @@ export class AmqpConnection {
) => Promise<SubscribeResponse>,
msgOptions: MessageHandlerOptions
) {
return this._channel.addSetup(channel =>
this.setupSubscriberChannel<T>(handler, msgOptions, channel)
);
}

private async setupSubscriberChannel<T>(
handler: (
msg: T,
rawMessage?: amqplib.ConsumeMessage
) => Promise<SubscribeResponse>,
msgOptions: MessageHandlerOptions,
channel: amqplib.ConfirmChannel
): Promise<void> {
const { exchange, routingKey } = msgOptions;

const { queue } = await this.channel.assertQueue(
const { queue } = await channel.assertQueue(
msgOptions.queue || '',
msgOptions.queueOptions || undefined
);

const routingKeys = Array.isArray(routingKey) ? routingKey : [routingKey];

await Promise.all(
routingKeys.map(x => this.channel.bindQueue(queue, exchange, x))
routingKeys.map(x => channel.bindQueue(queue, exchange, x))
);

await this.channel.consume(queue, async msg => {
await channel.consume(queue, async msg => {
try {
if (msg == null) {
throw new Error('Received null message');
Expand All @@ -121,7 +165,7 @@ export class AmqpConnection {
const message = JSON.parse(msg.content.toString()) as T;
const response = await handler(message, msg);
if (response instanceof Nack) {
this._channel.nack(msg, false, response.requeue);
channel.nack(msg, false, response.requeue);
return;
}

Expand All @@ -131,7 +175,7 @@ export class AmqpConnection {
);
}

this._channel.ack(msg);
channel.ack(msg);
} catch (e) {
if (msg == null) {
return;
Expand All @@ -141,15 +185,15 @@ export class AmqpConnection {
this.config.defaultSubscribeErrorBehavior;
switch (errorBehavior) {
case MessageHandlerErrorBehavior.ACK: {
this._channel.ack(msg);
channel.ack(msg);
break;
}
case MessageHandlerErrorBehavior.REQUEUE: {
this._channel.nack(msg, false, true);
channel.nack(msg, false, true);
break;
}
default:
this._channel.nack(msg, false, false);
channel.nack(msg, false, false);
}
}
}
Expand All @@ -162,21 +206,34 @@ export class AmqpConnection {
rawMessage?: amqplib.ConsumeMessage
) => Promise<RpcResponse<U>>,
rpcOptions: MessageHandlerOptions
) {
return this._channel.addSetup(channel =>
this.setupRpcChannel<T, U>(handler, rpcOptions, channel)
);
}

public async setupRpcChannel<T, U>(
handler: (
msg: T,
rawMessage?: amqplib.ConsumeMessage
) => Promise<RpcResponse<U>>,
rpcOptions: MessageHandlerOptions,
channel: amqplib.ConfirmChannel
) {
const { exchange, routingKey } = rpcOptions;

const { queue } = await this.channel.assertQueue(
const { queue } = await channel.assertQueue(
rpcOptions.queue || '',
rpcOptions.queueOptions || undefined
);

const routingKeys = Array.isArray(routingKey) ? routingKey : [routingKey];

await Promise.all(
routingKeys.map(x => this.channel.bindQueue(queue, exchange, x))
routingKeys.map(x => channel.bindQueue(queue, exchange, x))
);

await this.channel.consume(queue, async msg => {
await channel.consume(queue, async msg => {
try {
if (msg == null) {
throw new Error('Received null message');
Expand All @@ -186,15 +243,15 @@ export class AmqpConnection {
const response = await handler(message, msg);

if (response instanceof Nack) {
this._channel.nack(msg, false, response.requeue);
channel.nack(msg, false, response.requeue);
return;
}

const { replyTo, correlationId } = msg.properties;
if (replyTo) {
this.publish('', replyTo, response, { correlationId });
}
this._channel.ack(msg);
channel.ack(msg);
} catch (e) {
if (msg == null) {
return;
Expand All @@ -203,15 +260,15 @@ export class AmqpConnection {
rpcOptions.errorBehavior || this.config.defaultRpcErrorBehavior;
switch (errorBehavior) {
case MessageHandlerErrorBehavior.ACK: {
this._channel.ack(msg);
channel.ack(msg);
break;
}
case MessageHandlerErrorBehavior.REQUEUE: {
this._channel.nack(msg, false, true);
channel.nack(msg, false, true);
break;
}
default:
this._channel.nack(msg, false, false);
channel.nack(msg, false, false);
}
}
}
Expand All @@ -224,33 +281,11 @@ export class AmqpConnection {
message: any,
options?: amqplib.Options.Publish
) {
this.channel.publish(
return this.channel.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(message)),
options
);
}

private async initDirectReplyQueue() {
// Set up a consumer on the Direct Reply-To queue to facilitate RPC functionality
await this._channel.consume(
DIRECT_REPLY_QUEUE,
async msg => {
if (msg == null) {
return;
}

const correlationMessage: CorrelationMessage = {
correlationId: msg.properties.correlationId.toString(),
message: JSON.parse(msg.content.toString())
};

this.messageSubject.next(correlationMessage);
},
{
noAck: true
}
);
}
}

0 comments on commit 9049058

Please sign in to comment.