Skip to content

Commit

Permalink
feat: send to queue
Browse files Browse the repository at this point in the history
  • Loading branch information
DIY0R committed Jul 1, 2024
1 parent 35f1181 commit ed05d58
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
15 changes: 15 additions & 0 deletions lib/rmq-connect.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,21 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
throw new Error(`Failed to send message ${error}`);
}
}
public sendToQueue<IMessage>(
queue: string,
message: IMessage,
options?: Options.Publish,
): boolean {
try {
return this.baseChannel.sendToQueue(
queue,
Buffer.from(JSON.stringify(message)),
options,
);
} catch (error) {
throw new Error(`Failed to send message ${error}`);
}
}
private async setUpConnect(options: IRabbitMQConfig) {
this.connection = await connect(options, this.globalOptions);
this.isConnected = true;
Expand Down
15 changes: 12 additions & 3 deletions lib/rmq.global.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ConsumeMessage, Message, Replies, Channel } from 'amqplib';
import { ConsumeMessage, Message, Replies, Channel, Options } from 'amqplib';
import {
IGlobalOptions,
INotifyReply,
Expand All @@ -12,7 +12,7 @@ import {
RMQ_APP_OPTIONS,
TIMEOUT_ERROR,
} from './constants';
import { Inject, LoggerService, OnModuleInit } from '@nestjs/common';
import { Inject, Logger, LoggerService, OnModuleInit } from '@nestjs/common';
import { getUniqId } from './common';
import { EventEmitter } from 'stream';
import { RQMColorLogger } from './common/logger';
Expand Down Expand Up @@ -64,11 +64,12 @@ export class RmqGlobalService implements OnModuleInit {
});
});
}

public notify<IMessage>(
exchange: string,
topic: string,
message: IMessage,
options?: IPublishOptions,
options?: Options.Publish,
): INotifyReply {
this.rmqNestjsConnectService.publish({
exchange: exchange,
Expand All @@ -83,6 +84,13 @@ export class RmqGlobalService implements OnModuleInit {
return { status: 'ok' };
}

public sendToQueue<IMessage>(
...args: [string, IMessage, Options.Publish?]
): boolean {
const status = this.rmqNestjsConnectService.sendToQueue(...args);
return status;
}

public ack(
...params: Parameters<Channel['ack']>
): ReturnType<Channel['ack']> {
Expand All @@ -103,6 +111,7 @@ export class RmqGlobalService implements OnModuleInit {
await this.rmqNestjsConnectService.listenReplyQueue(
this.replyToQueue.queue,
this.listenReplyQueue.bind(this),
this.globalOptions.globalBroker.replyTo.consumOptions,
);
}
}

0 comments on commit ed05d58

Please sign in to comment.