Skip to content

Commit

Permalink
feat: decorator definitions
Browse files Browse the repository at this point in the history
  • Loading branch information
DIY0R committed Jun 24, 2024
1 parent aa57cbc commit 369f4ae
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 58 deletions.
43 changes: 25 additions & 18 deletions lib/common/meta-teg.discovery.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,41 @@
import { Inject, Injectable } from '@nestjs/common';
import { Injectable } from '@nestjs/common';
import { ModulesContainer, Reflector } from '@nestjs/core';
import { MetadataScanner } from '@nestjs/core';
import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper';
import { TARGET_MODULE } from '../constants';
import { MESSAGE_ROUTER, MODULE_TOKEN, TARGET_MODULE } from '../constants';
import { IMetaTegsMap } from '../interfaces';
import { RQMColorLogger } from './logger';
import { Module } from '@nestjs/core/injector/module';

@Injectable()
export class MetaTegsScannerService {
logger = new RQMColorLogger(false);
constructor(
private readonly metadataScanner: MetadataScanner,
private readonly reflector: Reflector,
private readonly modulesContainer: ModulesContainer,
@Inject(TARGET_MODULE) private readonly targetModuleName: string
private readonly modulesContainer: ModulesContainer
) {}

public scan(metaTeg: string) {
public findModulesByProviderValue(tokenValue: string): Module {
for (const module of this.modulesContainer.values()) {
const importsModules: Module[] = [...module.imports.values()];
for (const importedModule of importsModules) {
const provider = importedModule.providers.get(MODULE_TOKEN);
if (provider && provider.instance === tokenValue) return module;
}
}
return null;
}
public scan(metaTeg: string, tokenValue: string) {
const rmqMessagesMap = new Map();

const currentModule = this.getCurrentModule();
const currentModule = this.findModulesByProviderValue(tokenValue);
if (!currentModule) return rmqMessagesMap;

const providersAndControllers =
this.getProvidersAndControllers(currentModule);

providersAndControllers.forEach((provider: InstanceWrapper) => {
const { instance } = provider;

const prototype = Object.getPrototypeOf(instance);
this.metadataScanner
.getAllMethodNames(prototype)
Expand All @@ -35,15 +46,8 @@ export class MetaTegsScannerService {

return rmqMessagesMap;
}
private getCurrentModule() {
const modules = [...this.modulesContainer.values()];
return (
modules.find(
(module) => module.metatype?.name === this.targetModuleName
) || null
);
}
private getProvidersAndControllers(module) {

private getProvidersAndControllers(module: Module) {
return [...module.providers.values(), ...module.controllers.values()];
}
private lookupMethods(
Expand All @@ -56,6 +60,9 @@ export class MetaTegsScannerService {
const method = prototype[methodName];
const event = this.reflector.get<string>(metaTeg, method);
const boundHandler = instance[methodName].bind(instance);
if (event) rmqMessagesMap.set(event, boundHandler);
if (event) {
rmqMessagesMap.set(event, boundHandler);
this.logger.log('Mapped ' + event, MESSAGE_ROUTER);
}
}
}
2 changes: 2 additions & 0 deletions lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ export const TARGET_MODULE = 'TARGET_MODULE';

export const INITIALIZATION_STEP_DELAY = 400;
export const DEFAULT_TIMEOUT = 40000;
export const MESSAGE_ROUTER = 'MessageRouterExplorer';
export const MODULE_TOKEN = 'MODULE_UNIQ_TOKEN';

export const INDICATE_ERROR = 'Please indicate `replyToQueue`';
export const TIMEOUT_ERROR = 'Response timeout error';
Expand Down
2 changes: 2 additions & 0 deletions lib/decorators/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './rmq-message.decorator';
export * from './transform.decorator';
2 changes: 1 addition & 1 deletion lib/decorators/rmq-message.decorator.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { RMQ_MESSAGE_META_TEG } from '../constants';

export function RMQEvent(event: string) {
export function MessageRoute(event: string) {
return function (target: any, propertyKey: string | symbol, descriptor: any) {
Reflect.defineMetadata(RMQ_MESSAGE_META_TEG, event, descriptor.value);
};
Expand Down
1 change: 1 addition & 0 deletions lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './rmq.module';
export * from './rmq.service';
export * from './decorators';
1 change: 0 additions & 1 deletion lib/interfaces/rmq-options.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ export interface IMessageBroker {
replyTo?: Options.AssertQueue;
queue?: IQueue;
messageTimeout?: number;
targetModuleName: string;
serviceName?: string;
}
export interface IBindQueue {
Expand Down
26 changes: 13 additions & 13 deletions lib/rmq-connect.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {

private declared = false;
constructor(
@Inject(RMQ_CONNECT_OPTIONS) private readonly options: IRabbitMQConfig,
@Inject(RMQ_CONNECT_OPTIONS) private readonly options: IRabbitMQConfig
) {}
async onModuleInit(): Promise<void> {
if (this.declared) throw Error('Root RmqNestjsModule already declared!');
Expand All @@ -34,18 +34,18 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
}

public async assertExchange(
options: IExchange,
options: IExchange
): Promise<Replies.AssertExchange> {
try {
const exchange = await this.baseChannel.assertExchange(
options.exchange,
options.type,
options.options,
options.options
);
return exchange;
} catch (error) {
throw new Error(
`Failed to assert exchange '${options.exchange}': ${error.message}`,
`Failed to assert exchange '${options.exchange}': ${error.message}`
);
}
}
Expand All @@ -56,19 +56,19 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
}
public async assertQueue(
typeQueue: TypeQueue,
options: IQueue,
options: IQueue
): Promise<Replies.AssertQueue> {
try {
if (typeQueue == TypeQueue.QUEUE) {
const queue = await this.baseChannel.assertQueue(
options.queue,
options.options,
options.options
);
return queue;
}
const queue = await this.replyToChannel.assertQueue(
options.queue || '',
options.options,
options.options
);
return queue;
} catch (error) {
Expand All @@ -81,11 +81,11 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
bindQueue.queue,
bindQueue.source,
bindQueue.pattern,
bindQueue.args,
bindQueue.args
);
} catch (error) {
throw new Error(
`Failed to Bind Queue ,source:${bindQueue.source} queue: ${bindQueue.queue}`,
`Failed to Bind Queue ,source:${bindQueue.source} queue: ${bindQueue.queue}`
);
}
}
Expand All @@ -96,15 +96,15 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
Buffer.from(JSON.stringify(sendToQueueOptions.content)),
{
correlationId: sendToQueueOptions.correlationId,
},
}
);
} catch (error) {
throw new Error(`Failed to send Reply Queue`);
}
}
async listenReplyQueue(
queue: string,
listenQueue: (msg: ConsumeMessage | null) => void,
listenQueue: (msg: ConsumeMessage | null) => void
) {
try {
await this.replyToChannel.consume(queue, listenQueue, {
Expand All @@ -116,7 +116,7 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
}
async listenQueue(
queue: string,
listenQueue: (msg: ConsumeMessage | null) => void,
listenQueue: (msg: ConsumeMessage | null) => void
): Promise<void> {
try {
await this.baseChannel.consume(queue, listenQueue, {
Expand All @@ -136,7 +136,7 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
{
replyTo: sendMessage.options.replyTo,
correlationId: sendMessage.options.correlationId,
},
}
);
} catch (error) {
throw new Error(`Failed to send message ${error}`);
Expand Down
1 change: 0 additions & 1 deletion lib/rmq-core.module.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { DynamicModule, Module, Global } from '@nestjs/common';
import { RMQ_APP_OPTIONS, RMQ_CONNECT_OPTIONS } from './constants';
import { IRMQSRootAsyncOptions, IRabbitMQConfig } from './interfaces';

import { RmqNestjsConnectService } from './rmq-connect.service';
import { IAppOptions } from './interfaces/app-options.interface';

Expand Down
8 changes: 4 additions & 4 deletions lib/rmq.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import {
IRMQSRootAsyncOptions,
IRabbitMQConfig,
} from './interfaces';
import { RMQ_BROKER_OPTIONS, TARGET_MODULE } from './constants';
import { MODULE_TOKEN, RMQ_BROKER_OPTIONS, TARGET_MODULE } from './constants';
import { DiscoveryModule } from '@nestjs/core';
import { MetaTegsScannerService } from './common';
import { MetaTegsScannerService, getUniqId } from './common';
import { RmqNestjsCoreModule } from './rmq-core.module';
import { IAppOptions } from './interfaces/app-options.interface';

Expand Down Expand Up @@ -37,11 +37,11 @@ export class RmqNestjsModule {
imports: [DiscoveryModule],
providers: [
{ provide: RMQ_BROKER_OPTIONS, useValue: options },
{ provide: TARGET_MODULE, useValue: options.targetModuleName },
{ provide: MODULE_TOKEN, useFactory: getUniqId },
RmqService,
MetaTegsScannerService,
],
exports: [RmqService, MetaTegsScannerService],
exports: [RmqService, MetaTegsScannerService, MODULE_TOKEN],
};
}
}
33 changes: 18 additions & 15 deletions lib/rmq.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import {
OnModuleDestroy,
OnModuleInit,
} from '@nestjs/common';

import { IMessageBroker, IPublishOptions, TypeQueue } from './interfaces';
import { IMetaTegsMap } from './interfaces/metategs';
import {
DEFAULT_TIMEOUT,
INDICATE_ERROR,
INITIALIZATION_STEP_DELAY,
INOF_NOT_FULL_OPTIONS,
MODULE_TOKEN,
RECIVED_MESSAGE_ERROR,
RMQ_APP_OPTIONS,
RMQ_BROKER_OPTIONS,
Expand Down Expand Up @@ -41,23 +41,26 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
private readonly metaTegsScannerService: MetaTegsScannerService,
@Inject(RMQ_BROKER_OPTIONS) private options: IMessageBroker,
@Inject(RMQ_APP_OPTIONS) private appOptions: IAppOptions,
@Inject(MODULE_TOKEN) private readonly moduleToken: string
) {
this.logger = appOptions.logger
? appOptions.logger
: new RQMColorLogger(this.appOptions.logMessages);
}

async onModuleInit() {
this.rmqMessageTegs =
this.metaTegsScannerService.scan(RMQ_MESSAGE_META_TEG);
this.rmqMessageTegs = this.metaTegsScannerService.scan(
RMQ_MESSAGE_META_TEG,
this.moduleToken
);
await this.init();
this.isInitialized = true;
}

public async notify<IMessage>(
topic: string,
message: IMessage,
options?: IPublishOptions,
options?: IPublishOptions
) {
await this.initializationCheck();
this.rmqNestjsConnectService.publish({
Expand All @@ -75,7 +78,7 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
public async send<IMessage, IReply>(
topic: string,
message: IMessage,
options?: IPublishOptions,
options?: IPublishOptions
): Promise<IReply> {
await this.initializationCheck();
if (!this.replyToQueue) return this.logger.error(INDICATE_ERROR);
Expand Down Expand Up @@ -107,10 +110,10 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
});
}
private async listenQueue(message: ConsumeMessage | null): Promise<void> {
if (!message) return;
const consumeFunction = this.rmqMessageTegs.get(message.fields.routingKey);
if (!consumeFunction) return;
const result = await consumeFunction(
JSON.parse(message.content.toString()),
JSON.parse(message.content.toString())
);
if (message.properties.replyTo) {
await this.rmqNestjsConnectService.sendToReplyQueue({
Expand All @@ -122,7 +125,7 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
this.rmqNestjsConnectService.ack(message);
}
private async listenReplyQueue(
message: ConsumeMessage | null,
message: ConsumeMessage | null
): Promise<void> {
if (message.properties.correlationId) {
this.sendResponseEmitter.emit(message.properties.correlationId, message);
Expand All @@ -131,20 +134,20 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {

private async init() {
this.exchange = await this.rmqNestjsConnectService.assertExchange(
this.options.exchange,
this.options.exchange
);
if (this.options.replyTo) await this.assertReplyQueueBind();
await this.bindQueueExchange();
}
private async bindQueueExchange() {
if (!this.options.queue || !this.rmqMessageTegs?.size)
return this.logger.warn(
this.options.targetModuleName,
INOF_NOT_FULL_OPTIONS,
this.options.exchange.exchange
);
const queue = await this.rmqNestjsConnectService.assertQueue(
TypeQueue.QUEUE,
this.options.queue,
this.options.queue
);
this.rmqMessageTegs.forEach(async (_, key) => {
await this.rmqNestjsConnectService.bindQueue({
Expand All @@ -155,24 +158,24 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
});
await this.rmqNestjsConnectService.listenQueue(
this.options.queue.queue,
this.listenQueue.bind(this),
this.listenQueue.bind(this)
);
}

private async assertReplyQueueBind() {
this.replyToQueue = await this.rmqNestjsConnectService.assertQueue(
TypeQueue.REPLY_QUEUE,
{ queue: '', options: this.options.replyTo },
{ queue: '', options: this.options.replyTo }
);
await this.rmqNestjsConnectService.listenReplyQueue(
this.replyToQueue.queue,
this.listenReplyQueue.bind(this),
this.listenReplyQueue.bind(this)
);
}
private async initializationCheck() {
if (this.isInitialized) return;
await new Promise<void>((resolve) =>
setTimeout(resolve, INITIALIZATION_STEP_DELAY),
setTimeout(resolve, INITIALIZATION_STEP_DELAY)
);
await this.initializationCheck();
}
Expand Down
4 changes: 1 addition & 3 deletions test/mocks/rmq-nestjs.module.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Module } from '@nestjs/common';
import { RmqNestjsModule, RmqService } from '../../lib';
import { RmqNestjsModule } from '../../lib';
import { RmqEvents } from './rmq.event';
import { RmqServieController } from './rmq.controller';

Expand All @@ -12,9 +12,7 @@ import { RmqServieController } from './rmq.controller';
options: { durable: true },
},
queue: { queue: 'test-for', options: { durable: true } },

replyTo: { durable: true },
targetModuleName: 'ConnectionMockModule',
}),
],
providers: [RmqEvents, RmqServieController],
Expand Down
Loading

0 comments on commit 369f4ae

Please sign in to comment.