Skip to content

Commit

Permalink
feat(rabbitmq): Added a new decorator @RabbitHeader()
Browse files Browse the repository at this point in the history
* feat(add_header_decorator): @RabbitHeader() decorator added

now we can access provided headers inside consumers

* fix(merged): branch merged with parent

* fix(rabbit_header): index modified inside RabbitRpcParamsFactory

* fix(namedconnectionmodule): rabbit uri changed to its default

* fix(doc): emir muhammadzadeh added
  • Loading branch information
muhammadzadeh authored Jul 25, 2022
1 parent 1d26ad5 commit b283945
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 13 deletions.
12 changes: 11 additions & 1 deletion .all-contributorsrc
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,16 @@
"code"
]
},
{
"login": "muhammadzadeh",
"name": "Emir Muhammadzadeh",
"avatar_url": "https://avatars.githubusercontent.com/u/10474363?v=4",
"profile": "https://www.linkedin.com/in/muhammadzadeh/",
"contributions": [
"code",
"doc"
]
},
{
"login": "craigotis",
"name": "Craig Otis",
Expand All @@ -302,4 +312,4 @@
}
],
"contributorsPerLine": 7
}
}
28 changes: 21 additions & 7 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ export type ConsumerTag = string;

export type SubscriberHandler<T = unknown> = (
msg: T | undefined,
rawMessage?: ConsumeMessage
rawMessage?: ConsumeMessage,
headers?: any
) => Promise<SubscribeResponse>;

export interface CorrelationMessage {
Expand All @@ -65,15 +66,17 @@ export type ConsumerHandler<T, U> =
msgOptions: MessageHandlerOptions;
handler: (
msg: T | undefined,
rawMessage?: ConsumeMessage
rawMessage?: ConsumeMessage,
headers?: any
) => Promise<SubscribeResponse>;
})
| (BaseConsumerHandler & {
type: 'rpc';
rpcOptions: MessageHandlerOptions;
handler: (
msg: T | undefined,
rawMessage?: ConsumeMessage
rawMessage?: ConsumeMessage,
headers?: any
) => Promise<RpcResponse<U>>;
});

Expand Down Expand Up @@ -422,7 +425,8 @@ export class AmqpConnection {
public async createRpc<T, U>(
handler: (
msg: T | undefined,
rawMessage?: ConsumeMessage
rawMessage?: ConsumeMessage,
headers?: any
) => Promise<RpcResponse<U>>,
rpcOptions: MessageHandlerOptions
): Promise<SubscriptionResult> {
Expand All @@ -443,7 +447,8 @@ export class AmqpConnection {
public async setupRpcChannel<T, U>(
handler: (
msg: T | undefined,
rawMessage?: ConsumeMessage
rawMessage?: ConsumeMessage,
headers?: any
) => Promise<RpcResponse<U>>,
rpcOptions: MessageHandlerOptions,
channel: ConfirmChannel
Expand Down Expand Up @@ -533,11 +538,16 @@ export class AmqpConnection {
}

private handleMessage<T, U>(
handler: (msg: T | undefined, rawMessage?: ConsumeMessage) => Promise<U>,
handler: (
msg: T | undefined,
rawMessage?: ConsumeMessage,
headers?: any
) => Promise<U>,
msg: ConsumeMessage,
allowNonJsonMessages?: boolean
) {
let message: T | undefined = undefined;
let headers: any = undefined;
if (msg.content) {
if (allowNonJsonMessages) {
try {
Expand All @@ -551,7 +561,11 @@ export class AmqpConnection {
}
}

return handler(message, msg);
if (msg.properties && msg.properties.headers) {
headers = msg.properties.headers;
}

return handler(message, msg, headers);
}

private async setupQueue(
Expand Down
2 changes: 2 additions & 0 deletions packages/rabbitmq/src/rabbitmq.constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ export const RABBIT_HANDLER = Symbol('RABBIT_HANDLER');
export const RABBIT_CONFIG_TOKEN = Symbol('RABBIT_CONFIG');
export const RABBIT_ARGS_METADATA = 'RABBIT_ARGS_METADATA';
export const RABBIT_PARAM_TYPE = 3;
export const RABBIT_HEADER_TYPE = 4;
export const RABBIT_REQUEST_TYPE = 5;
49 changes: 47 additions & 2 deletions packages/rabbitmq/src/rabbitmq.decorators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import {
RABBIT_ARGS_METADATA,
RABBIT_CONFIG_TOKEN,
RABBIT_HANDLER,
RABBIT_HEADER_TYPE,
RABBIT_PARAM_TYPE,
RABBIT_REQUEST_TYPE,
} from './rabbitmq.constants';
import { RabbitHandlerConfig } from './rabbitmq.interfaces';

Expand Down Expand Up @@ -40,6 +42,7 @@ export const InjectRabbitMQConfig =
export const createPipesRpcParamDecorator =
(
data?: any,
type: number = RABBIT_PARAM_TYPE,
...pipes: (Type<PipeTransform> | PipeTransform)[]
): ParameterDecorator =>
(target, key, index) => {
Expand All @@ -52,7 +55,7 @@ export const createPipesRpcParamDecorator =

Reflect.defineMetadata(
RABBIT_ARGS_METADATA,
assignMetadata(args, RABBIT_PARAM_TYPE, index, paramData, ...paramPipes),
assignMetadata(args, type, index, paramData, ...paramPipes),
target.constructor,
key
);
Expand All @@ -70,5 +73,47 @@ export function RabbitPayload(
propertyOrPipe?: string | (Type<PipeTransform> | PipeTransform),
...pipes: (Type<PipeTransform> | PipeTransform)[]
): ParameterDecorator {
return createPipesRpcParamDecorator(propertyOrPipe, ...pipes);
return createPipesRpcParamDecorator(
propertyOrPipe,
RABBIT_PARAM_TYPE,
...pipes
);
}

export function RabbitHeader(): ParameterDecorator;
export function RabbitHeader(
...pipes: (Type<PipeTransform> | PipeTransform)[]
): ParameterDecorator;
export function RabbitHeader(
propertyKey?: string,
...pipes: (Type<PipeTransform> | PipeTransform)[]
): ParameterDecorator;
export function RabbitHeader(
propertyOrPipe?: string | (Type<PipeTransform> | PipeTransform),
...pipes: (Type<PipeTransform> | PipeTransform)[]
): ParameterDecorator {
return createPipesRpcParamDecorator(
propertyOrPipe,
RABBIT_HEADER_TYPE,
...pipes
);
}

export function RabbitRequest(): ParameterDecorator;
export function RabbitRequest(
...pipes: (Type<PipeTransform> | PipeTransform)[]
): ParameterDecorator;
export function RabbitRequest(
propertyKey?: string,
...pipes: (Type<PipeTransform> | PipeTransform)[]
): ParameterDecorator;
export function RabbitRequest(
propertyOrPipe?: string | (Type<PipeTransform> | PipeTransform),
...pipes: (Type<PipeTransform> | PipeTransform)[]
): ParameterDecorator {
return createPipesRpcParamDecorator(
propertyOrPipe,
RABBIT_REQUEST_TYPE,
...pipes
);
}
19 changes: 16 additions & 3 deletions packages/rabbitmq/src/rabbitmq.factory.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
import { ParamData } from '@nestjs/common';
import { isObject } from 'lodash';
import { RABBIT_PARAM_TYPE } from './rabbitmq.constants';
import {
RABBIT_HEADER_TYPE,
RABBIT_PARAM_TYPE,
RABBIT_REQUEST_TYPE,
} from './rabbitmq.constants';

export class RabbitRpcParamsFactory {
public exchangeKeyForValue(type: number, data: ParamData, args: any[]) {
if (!args || type !== RABBIT_PARAM_TYPE) {
if (!args) {
return null;
}

return data && !isObject(data) ? args[0]?.[data] : args[0];
let index = 0;
if (type === RABBIT_PARAM_TYPE) {
index = 0;
} else if (type === RABBIT_REQUEST_TYPE) {
index = 1;
} else if (type === RABBIT_HEADER_TYPE) {
index = 2;
}

return data && !isObject(data) ? args[index]?.[data] : args[index];
}
}

0 comments on commit b283945

Please sign in to comment.