diff --git a/lib/rmq-connect.service.ts b/lib/rmq-connect.service.ts index 8997823..e1cdfe9 100644 --- a/lib/rmq-connect.service.ts +++ b/lib/rmq-connect.service.ts @@ -185,6 +185,21 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy { throw new Error(`Failed to send message ${error}`); } } + public sendToQueue( + queue: string, + message: IMessage, + options?: Options.Publish, + ): boolean { + try { + return this.baseChannel.sendToQueue( + queue, + Buffer.from(JSON.stringify(message)), + options, + ); + } catch (error) { + throw new Error(`Failed to send message ${error}`); + } + } private async setUpConnect(options: IRabbitMQConfig) { this.connection = await connect(options, this.globalOptions); this.isConnected = true; diff --git a/lib/rmq.global.service.ts b/lib/rmq.global.service.ts index ec8ac9f..ce00a46 100644 --- a/lib/rmq.global.service.ts +++ b/lib/rmq.global.service.ts @@ -1,4 +1,4 @@ -import { ConsumeMessage, Message, Replies, Channel } from 'amqplib'; +import { ConsumeMessage, Message, Replies, Channel, Options } from 'amqplib'; import { IGlobalOptions, INotifyReply, @@ -12,7 +12,7 @@ import { RMQ_APP_OPTIONS, TIMEOUT_ERROR, } from './constants'; -import { Inject, LoggerService, OnModuleInit } from '@nestjs/common'; +import { Inject, Logger, LoggerService, OnModuleInit } from '@nestjs/common'; import { getUniqId } from './common'; import { EventEmitter } from 'stream'; import { RQMColorLogger } from './common/logger'; @@ -64,11 +64,12 @@ export class RmqGlobalService implements OnModuleInit { }); }); } + public notify( exchange: string, topic: string, message: IMessage, - options?: IPublishOptions, + options?: Options.Publish, ): INotifyReply { this.rmqNestjsConnectService.publish({ exchange: exchange, @@ -83,6 +84,13 @@ export class RmqGlobalService implements OnModuleInit { return { status: 'ok' }; } + public sendToQueue( + ...args: [string, IMessage, Options.Publish?] + ): boolean { + const status = this.rmqNestjsConnectService.sendToQueue(...args); + return status; + } + public ack( ...params: Parameters ): ReturnType { @@ -103,6 +111,7 @@ export class RmqGlobalService implements OnModuleInit { await this.rmqNestjsConnectService.listenReplyQueue( this.replyToQueue.queue, this.listenReplyQueue.bind(this), + this.globalOptions.globalBroker.replyTo.consumOptions, ); } }