Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rabbit): support multiple configs on the same handler #682

Merged
merged 1 commit into from
Jan 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions integration/rabbitmq/e2e/subscribe.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const exchange = 'testSubscribeExchange';
const routingKey1 = 'testSubscribeRoute1';
const routingKey2 = 'testSubscribeRoute2';
const routingKey3 = 'testSubscribeViaHandlerRoute';
const routingKey4 = 'testSubscribeViaHandlerRouteMulti1';
const routingKey5 = 'testSubscribeViaHandlerRouteMulti2';
const nonJsonRoutingKey = 'nonJsonSubscribeRoute';

const createRoutingKey = 'test.create.object';
Expand All @@ -40,6 +42,13 @@ class SubscribeService {
testHandler(message);
}

@RabbitSubscribe({
name: 'handler2',
})
handleSubscribeByNameMulti(message: object) {
testHandler(message);
}

@RabbitSubscribe({
exchange,
routingKey: [routingKey1, routingKey2],
Expand Down Expand Up @@ -152,6 +161,16 @@ describe('Rabbit Subscribe', () => {
exchange,
routingKey: [routingKey3],
},
handler2: [
{
exchange,
routingKey: routingKey4,
},
{
exchange,
routingKey: routingKey5,
},
],
},
uri,
connectionInitOptions: { wait: true, reject: true, timeout: 3000 },
Expand Down Expand Up @@ -196,6 +215,17 @@ describe('Rabbit Subscribe', () => {
expect(testHandler).toHaveBeenCalledWith(`testMessage`);
});

it('should receive all messages when subscribed via handler name with multiple configs', async () => {
await amqpConnection.publish(exchange, routingKey4, 'testMessage');
await amqpConnection.publish(exchange, routingKey5, 'testMessage2');

await new Promise((resolve) => setTimeout(resolve, 50));

expect(testHandler).toHaveBeenCalledTimes(2);
expect(testHandler).toHaveBeenCalledWith(`testMessage`);
expect(testHandler).toHaveBeenCalledWith(`testMessage2`);
});

it('should work with a topic exchange set up that has multiple subscribers with similar routing keys', async () => {
const routingKeys = [createRoutingKey, updateRoutingKey, deleteRoutingKey];

Expand Down
5 changes: 4 additions & 1 deletion packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ export interface ConnectionInitOptions {
}

export type RabbitMQChannels = Record<string, RabbitMQChannelConfig>;
export type RabbitMQHandlers = Record<string, MessageHandlerOptions>;
export type RabbitMQHandlers = Record<
string,
MessageHandlerOptions | MessageHandlerOptions[]
>;

export interface RabbitMQConfig {
name?: string;
Expand Down
85 changes: 57 additions & 28 deletions packages/rabbitmq/src/rabbitmq.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { DiscoveryModule, DiscoveryService } from '@golevelup/nestjs-discovery';
import {
DiscoveredMethod,
DiscoveredMethodWithMeta,
DiscoveryModule,
DiscoveryService,
} from '@golevelup/nestjs-discovery';
import {
createConfigurableDynamicRootModule,
IConfigurableDynamicRootModule,
Expand Down Expand Up @@ -135,6 +140,40 @@ export class RabbitMQModule
RabbitMQModule.bootstrapped = false;
}

private async setupHandler(
connection: AmqpConnection,
discoveredMethod: DiscoveredMethod,
config: RabbitHandlerConfig,
handler: (...args: any[]) => Promise<any>
) {
const handlerDisplayName = `${discoveredMethod.parentClass.name}.${
discoveredMethod.methodName
} {${config.type}} -> ${
// eslint-disable-next-line sonarjs/no-nested-template-literals
config.queueOptions?.channel ? `${config.queueOptions.channel}::` : ''
}${config.exchange}::${config.routingKey}::${config.queue || 'amqpgen'}`;

if (
config.type === 'rpc' &&
!connection.configuration.enableDirectReplyTo
) {
this.logger.warn(
`Direct Reply-To Functionality is disabled. RPC handler ${handlerDisplayName} will not be registered`
);
return;
}

this.logger.log(handlerDisplayName);

return config.type === 'rpc'
? connection.createRpc(handler, config)
: connection.createSubscriber(
handler,
config,
discoveredMethod.methodName
);
}

// eslint-disable-next-line sonarjs/cognitive-complexity
public async onApplicationBootstrap() {
if (RabbitMQModule.bootstrapped) {
Expand Down Expand Up @@ -199,38 +238,28 @@ export class RabbitMQModule
'rmq' // contextType
);

const mergedConfig = {
...config,
...connection.configuration.handlers[config.name || ''],
};
const { exchange, routingKey, queue, queueOptions } = mergedConfig;
const moduleHandlerConfigRaw =
connection.configuration.handlers[config.name || ''];

const handlerDisplayName = `${discoveredMethod.parentClass.name}.${
discoveredMethod.methodName
} {${config.type}} -> ${
// eslint-disable-next-line sonarjs/no-nested-template-literals
queueOptions?.channel ? `${queueOptions.channel}::` : ''
}${exchange}::${routingKey}::${queue || 'amqpgen'}`;

if (
config.type === 'rpc' &&
!connection.configuration.enableDirectReplyTo
) {
this.logger.warn(
`Direct Reply-To Functionality is disabled. RPC handler ${handlerDisplayName} will not be registered`
);
return;
}
const moduleHandlerConfigs = Array.isArray(moduleHandlerConfigRaw)
? moduleHandlerConfigRaw
: [moduleHandlerConfigRaw];

this.logger.log(handlerDisplayName);
await Promise.all(
moduleHandlerConfigs.map((moduleHandlerConfig) => {
const mergedConfig = {
...config,
...moduleHandlerConfig,
};

return config.type === 'rpc'
? connection.createRpc(handler, mergedConfig)
: connection.createSubscriber(
handler,
return this.setupHandler(
connection,
discoveredMethod,
mergedConfig,
discoveredMethod.methodName
handler
);
})
);
})
);
}
Expand Down
Loading