From 369f4aecad1e94a47e58486c5c11a0fdb44f088b Mon Sep 17 00:00:00 2001 From: diy0r Date: Tue, 25 Jun 2024 00:36:32 +0500 Subject: [PATCH] feat: decorator definitions --- lib/common/meta-teg.discovery.ts | 43 ++++++++++++++----------- lib/constants.ts | 2 ++ lib/decorators/index.ts | 2 ++ lib/decorators/rmq-message.decorator.ts | 2 +- lib/index.ts | 1 + lib/interfaces/rmq-options.interface.ts | 1 - lib/rmq-connect.service.ts | 26 +++++++-------- lib/rmq-core.module.ts | 1 - lib/rmq.module.ts | 8 ++--- lib/rmq.service.ts | 33 ++++++++++--------- test/mocks/rmq-nestjs.module.ts | 4 +-- test/mocks/rmq.event.ts | 4 +-- 12 files changed, 69 insertions(+), 58 deletions(-) create mode 100644 lib/decorators/index.ts diff --git a/lib/common/meta-teg.discovery.ts b/lib/common/meta-teg.discovery.ts index d743b07..ade1f17 100644 --- a/lib/common/meta-teg.discovery.ts +++ b/lib/common/meta-teg.discovery.ts @@ -1,23 +1,33 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { ModulesContainer, Reflector } from '@nestjs/core'; import { MetadataScanner } from '@nestjs/core'; import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper'; -import { TARGET_MODULE } from '../constants'; +import { MESSAGE_ROUTER, MODULE_TOKEN, TARGET_MODULE } from '../constants'; import { IMetaTegsMap } from '../interfaces'; +import { RQMColorLogger } from './logger'; +import { Module } from '@nestjs/core/injector/module'; @Injectable() export class MetaTegsScannerService { + logger = new RQMColorLogger(false); constructor( private readonly metadataScanner: MetadataScanner, private readonly reflector: Reflector, - private readonly modulesContainer: ModulesContainer, - @Inject(TARGET_MODULE) private readonly targetModuleName: string + private readonly modulesContainer: ModulesContainer ) {} - - public scan(metaTeg: string) { + public findModulesByProviderValue(tokenValue: string): Module { + for (const module of this.modulesContainer.values()) { + const importsModules: Module[] = [...module.imports.values()]; + for (const importedModule of importsModules) { + const provider = importedModule.providers.get(MODULE_TOKEN); + if (provider && provider.instance === tokenValue) return module; + } + } + return null; + } + public scan(metaTeg: string, tokenValue: string) { const rmqMessagesMap = new Map(); - - const currentModule = this.getCurrentModule(); + const currentModule = this.findModulesByProviderValue(tokenValue); if (!currentModule) return rmqMessagesMap; const providersAndControllers = @@ -25,6 +35,7 @@ export class MetaTegsScannerService { providersAndControllers.forEach((provider: InstanceWrapper) => { const { instance } = provider; + const prototype = Object.getPrototypeOf(instance); this.metadataScanner .getAllMethodNames(prototype) @@ -35,15 +46,8 @@ export class MetaTegsScannerService { return rmqMessagesMap; } - private getCurrentModule() { - const modules = [...this.modulesContainer.values()]; - return ( - modules.find( - (module) => module.metatype?.name === this.targetModuleName - ) || null - ); - } - private getProvidersAndControllers(module) { + + private getProvidersAndControllers(module: Module) { return [...module.providers.values(), ...module.controllers.values()]; } private lookupMethods( @@ -56,6 +60,9 @@ export class MetaTegsScannerService { const method = prototype[methodName]; const event = this.reflector.get(metaTeg, method); const boundHandler = instance[methodName].bind(instance); - if (event) rmqMessagesMap.set(event, boundHandler); + if (event) { + rmqMessagesMap.set(event, boundHandler); + this.logger.log('Mapped ' + event, MESSAGE_ROUTER); + } } } diff --git a/lib/constants.ts b/lib/constants.ts index 6307206..1e95495 100644 --- a/lib/constants.ts +++ b/lib/constants.ts @@ -7,6 +7,8 @@ export const TARGET_MODULE = 'TARGET_MODULE'; export const INITIALIZATION_STEP_DELAY = 400; export const DEFAULT_TIMEOUT = 40000; +export const MESSAGE_ROUTER = 'MessageRouterExplorer'; +export const MODULE_TOKEN = 'MODULE_UNIQ_TOKEN'; export const INDICATE_ERROR = 'Please indicate `replyToQueue`'; export const TIMEOUT_ERROR = 'Response timeout error'; diff --git a/lib/decorators/index.ts b/lib/decorators/index.ts new file mode 100644 index 0000000..7e85c7d --- /dev/null +++ b/lib/decorators/index.ts @@ -0,0 +1,2 @@ +export * from './rmq-message.decorator'; +export * from './transform.decorator'; diff --git a/lib/decorators/rmq-message.decorator.ts b/lib/decorators/rmq-message.decorator.ts index d5c4c0a..3a20f90 100644 --- a/lib/decorators/rmq-message.decorator.ts +++ b/lib/decorators/rmq-message.decorator.ts @@ -1,6 +1,6 @@ import { RMQ_MESSAGE_META_TEG } from '../constants'; -export function RMQEvent(event: string) { +export function MessageRoute(event: string) { return function (target: any, propertyKey: string | symbol, descriptor: any) { Reflect.defineMetadata(RMQ_MESSAGE_META_TEG, event, descriptor.value); }; diff --git a/lib/index.ts b/lib/index.ts index 9c41bbb..4f2ceba 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -1,2 +1,3 @@ export * from './rmq.module'; export * from './rmq.service'; +export * from './decorators'; diff --git a/lib/interfaces/rmq-options.interface.ts b/lib/interfaces/rmq-options.interface.ts index 56fc0b3..695557c 100644 --- a/lib/interfaces/rmq-options.interface.ts +++ b/lib/interfaces/rmq-options.interface.ts @@ -34,7 +34,6 @@ export interface IMessageBroker { replyTo?: Options.AssertQueue; queue?: IQueue; messageTimeout?: number; - targetModuleName: string; serviceName?: string; } export interface IBindQueue { diff --git a/lib/rmq-connect.service.ts b/lib/rmq-connect.service.ts index 1c15f37..ae4b4b7 100644 --- a/lib/rmq-connect.service.ts +++ b/lib/rmq-connect.service.ts @@ -24,7 +24,7 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy { private declared = false; constructor( - @Inject(RMQ_CONNECT_OPTIONS) private readonly options: IRabbitMQConfig, + @Inject(RMQ_CONNECT_OPTIONS) private readonly options: IRabbitMQConfig ) {} async onModuleInit(): Promise { if (this.declared) throw Error('Root RmqNestjsModule already declared!'); @@ -34,18 +34,18 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy { } public async assertExchange( - options: IExchange, + options: IExchange ): Promise { try { const exchange = await this.baseChannel.assertExchange( options.exchange, options.type, - options.options, + options.options ); return exchange; } catch (error) { throw new Error( - `Failed to assert exchange '${options.exchange}': ${error.message}`, + `Failed to assert exchange '${options.exchange}': ${error.message}` ); } } @@ -56,19 +56,19 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy { } public async assertQueue( typeQueue: TypeQueue, - options: IQueue, + options: IQueue ): Promise { try { if (typeQueue == TypeQueue.QUEUE) { const queue = await this.baseChannel.assertQueue( options.queue, - options.options, + options.options ); return queue; } const queue = await this.replyToChannel.assertQueue( options.queue || '', - options.options, + options.options ); return queue; } catch (error) { @@ -81,11 +81,11 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy { bindQueue.queue, bindQueue.source, bindQueue.pattern, - bindQueue.args, + bindQueue.args ); } catch (error) { throw new Error( - `Failed to Bind Queue ,source:${bindQueue.source} queue: ${bindQueue.queue}`, + `Failed to Bind Queue ,source:${bindQueue.source} queue: ${bindQueue.queue}` ); } } @@ -96,7 +96,7 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy { Buffer.from(JSON.stringify(sendToQueueOptions.content)), { correlationId: sendToQueueOptions.correlationId, - }, + } ); } catch (error) { throw new Error(`Failed to send Reply Queue`); @@ -104,7 +104,7 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy { } async listenReplyQueue( queue: string, - listenQueue: (msg: ConsumeMessage | null) => void, + listenQueue: (msg: ConsumeMessage | null) => void ) { try { await this.replyToChannel.consume(queue, listenQueue, { @@ -116,7 +116,7 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy { } async listenQueue( queue: string, - listenQueue: (msg: ConsumeMessage | null) => void, + listenQueue: (msg: ConsumeMessage | null) => void ): Promise { try { await this.baseChannel.consume(queue, listenQueue, { @@ -136,7 +136,7 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy { { replyTo: sendMessage.options.replyTo, correlationId: sendMessage.options.correlationId, - }, + } ); } catch (error) { throw new Error(`Failed to send message ${error}`); diff --git a/lib/rmq-core.module.ts b/lib/rmq-core.module.ts index b96fcd8..d770b95 100644 --- a/lib/rmq-core.module.ts +++ b/lib/rmq-core.module.ts @@ -1,7 +1,6 @@ import { DynamicModule, Module, Global } from '@nestjs/common'; import { RMQ_APP_OPTIONS, RMQ_CONNECT_OPTIONS } from './constants'; import { IRMQSRootAsyncOptions, IRabbitMQConfig } from './interfaces'; - import { RmqNestjsConnectService } from './rmq-connect.service'; import { IAppOptions } from './interfaces/app-options.interface'; diff --git a/lib/rmq.module.ts b/lib/rmq.module.ts index 4c0cc42..011d29e 100644 --- a/lib/rmq.module.ts +++ b/lib/rmq.module.ts @@ -5,9 +5,9 @@ import { IRMQSRootAsyncOptions, IRabbitMQConfig, } from './interfaces'; -import { RMQ_BROKER_OPTIONS, TARGET_MODULE } from './constants'; +import { MODULE_TOKEN, RMQ_BROKER_OPTIONS, TARGET_MODULE } from './constants'; import { DiscoveryModule } from '@nestjs/core'; -import { MetaTegsScannerService } from './common'; +import { MetaTegsScannerService, getUniqId } from './common'; import { RmqNestjsCoreModule } from './rmq-core.module'; import { IAppOptions } from './interfaces/app-options.interface'; @@ -37,11 +37,11 @@ export class RmqNestjsModule { imports: [DiscoveryModule], providers: [ { provide: RMQ_BROKER_OPTIONS, useValue: options }, - { provide: TARGET_MODULE, useValue: options.targetModuleName }, + { provide: MODULE_TOKEN, useFactory: getUniqId }, RmqService, MetaTegsScannerService, ], - exports: [RmqService, MetaTegsScannerService], + exports: [RmqService, MetaTegsScannerService, MODULE_TOKEN], }; } } diff --git a/lib/rmq.service.ts b/lib/rmq.service.ts index 9dd8e03..858fc27 100644 --- a/lib/rmq.service.ts +++ b/lib/rmq.service.ts @@ -5,7 +5,6 @@ import { OnModuleDestroy, OnModuleInit, } from '@nestjs/common'; - import { IMessageBroker, IPublishOptions, TypeQueue } from './interfaces'; import { IMetaTegsMap } from './interfaces/metategs'; import { @@ -13,6 +12,7 @@ import { INDICATE_ERROR, INITIALIZATION_STEP_DELAY, INOF_NOT_FULL_OPTIONS, + MODULE_TOKEN, RECIVED_MESSAGE_ERROR, RMQ_APP_OPTIONS, RMQ_BROKER_OPTIONS, @@ -41,6 +41,7 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { private readonly metaTegsScannerService: MetaTegsScannerService, @Inject(RMQ_BROKER_OPTIONS) private options: IMessageBroker, @Inject(RMQ_APP_OPTIONS) private appOptions: IAppOptions, + @Inject(MODULE_TOKEN) private readonly moduleToken: string ) { this.logger = appOptions.logger ? appOptions.logger @@ -48,8 +49,10 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { } async onModuleInit() { - this.rmqMessageTegs = - this.metaTegsScannerService.scan(RMQ_MESSAGE_META_TEG); + this.rmqMessageTegs = this.metaTegsScannerService.scan( + RMQ_MESSAGE_META_TEG, + this.moduleToken + ); await this.init(); this.isInitialized = true; } @@ -57,7 +60,7 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { public async notify( topic: string, message: IMessage, - options?: IPublishOptions, + options?: IPublishOptions ) { await this.initializationCheck(); this.rmqNestjsConnectService.publish({ @@ -75,7 +78,7 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { public async send( topic: string, message: IMessage, - options?: IPublishOptions, + options?: IPublishOptions ): Promise { await this.initializationCheck(); if (!this.replyToQueue) return this.logger.error(INDICATE_ERROR); @@ -107,10 +110,10 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { }); } private async listenQueue(message: ConsumeMessage | null): Promise { - if (!message) return; const consumeFunction = this.rmqMessageTegs.get(message.fields.routingKey); + if (!consumeFunction) return; const result = await consumeFunction( - JSON.parse(message.content.toString()), + JSON.parse(message.content.toString()) ); if (message.properties.replyTo) { await this.rmqNestjsConnectService.sendToReplyQueue({ @@ -122,7 +125,7 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { this.rmqNestjsConnectService.ack(message); } private async listenReplyQueue( - message: ConsumeMessage | null, + message: ConsumeMessage | null ): Promise { if (message.properties.correlationId) { this.sendResponseEmitter.emit(message.properties.correlationId, message); @@ -131,7 +134,7 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { private async init() { this.exchange = await this.rmqNestjsConnectService.assertExchange( - this.options.exchange, + this.options.exchange ); if (this.options.replyTo) await this.assertReplyQueueBind(); await this.bindQueueExchange(); @@ -139,12 +142,12 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { private async bindQueueExchange() { if (!this.options.queue || !this.rmqMessageTegs?.size) return this.logger.warn( - this.options.targetModuleName, INOF_NOT_FULL_OPTIONS, + this.options.exchange.exchange ); const queue = await this.rmqNestjsConnectService.assertQueue( TypeQueue.QUEUE, - this.options.queue, + this.options.queue ); this.rmqMessageTegs.forEach(async (_, key) => { await this.rmqNestjsConnectService.bindQueue({ @@ -155,24 +158,24 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { }); await this.rmqNestjsConnectService.listenQueue( this.options.queue.queue, - this.listenQueue.bind(this), + this.listenQueue.bind(this) ); } private async assertReplyQueueBind() { this.replyToQueue = await this.rmqNestjsConnectService.assertQueue( TypeQueue.REPLY_QUEUE, - { queue: '', options: this.options.replyTo }, + { queue: '', options: this.options.replyTo } ); await this.rmqNestjsConnectService.listenReplyQueue( this.replyToQueue.queue, - this.listenReplyQueue.bind(this), + this.listenReplyQueue.bind(this) ); } private async initializationCheck() { if (this.isInitialized) return; await new Promise((resolve) => - setTimeout(resolve, INITIALIZATION_STEP_DELAY), + setTimeout(resolve, INITIALIZATION_STEP_DELAY) ); await this.initializationCheck(); } diff --git a/test/mocks/rmq-nestjs.module.ts b/test/mocks/rmq-nestjs.module.ts index e534ae1..74ce6df 100644 --- a/test/mocks/rmq-nestjs.module.ts +++ b/test/mocks/rmq-nestjs.module.ts @@ -1,5 +1,5 @@ import { Module } from '@nestjs/common'; -import { RmqNestjsModule, RmqService } from '../../lib'; +import { RmqNestjsModule } from '../../lib'; import { RmqEvents } from './rmq.event'; import { RmqServieController } from './rmq.controller'; @@ -12,9 +12,7 @@ import { RmqServieController } from './rmq.controller'; options: { durable: true }, }, queue: { queue: 'test-for', options: { durable: true } }, - replyTo: { durable: true }, - targetModuleName: 'ConnectionMockModule', }), ], providers: [RmqEvents, RmqServieController], diff --git a/test/mocks/rmq.event.ts b/test/mocks/rmq.event.ts index d427ee3..eb1263c 100644 --- a/test/mocks/rmq.event.ts +++ b/test/mocks/rmq.event.ts @@ -1,9 +1,9 @@ import { Injectable } from '@nestjs/common'; -import { RMQEvent } from '../../lib/decorators/rmq-message.decorator'; +import { MessageRoute } from '../../lib/decorators/rmq-message.decorator'; @Injectable() export class RmqEvents { - @RMQEvent('hi') + @MessageRoute('hi') hi() { return { message: 'hi' }; }