diff --git a/packages/rabbitmq/src/amqp/connection.ts b/packages/rabbitmq/src/amqp/connection.ts index f602e10ba..d0fdee40f 100644 --- a/packages/rabbitmq/src/amqp/connection.ts +++ b/packages/rabbitmq/src/amqp/connection.ts @@ -592,12 +592,7 @@ export class AmqpConnection { routingKey: string, message: T, options?: Options.Publish - ): Promise { - // source amqplib channel is used directly to keep the behavior of throwing connection related errors - if (!this.managedConnection.isConnected() || !this._channel) { - throw new Error('AMQP connection is not available'); - } - + ): Promise { let buffer: Buffer; if (message instanceof Buffer) { buffer = message; @@ -609,21 +604,12 @@ export class AmqpConnection { buffer = Buffer.alloc(0); } - return new Promise((resolve, reject) => { - this._channel.publish( - exchange, - routingKey, - buffer, - options, - (err, ok) => { - if (err) { - reject(err); - } else { - resolve(ok); - } - } - ); - }); + return this.selectManagedChannel().publish( + exchange, + routingKey, + buffer, + options + ); } private handleMessage(