Skip to content

Commit

Permalink
fix: ack message
Browse files Browse the repository at this point in the history
  • Loading branch information
DIY0R committed Jul 1, 2024
1 parent 029298a commit 4644495
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions lib/rmq.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
RMQ_MESSAGE_META_TEG,
TIMEOUT_ERROR,
} from './constants';
import { ConsumeMessage, Message, Replies, Channel } from 'amqplib';
import { ConsumeMessage, Message, Replies, Channel, Options } from 'amqplib';
import { MetaTegsScannerService } from './common';
import { RmqNestjsConnectService } from './rmq-connect.service';
import { getUniqId } from './common/get-uniqId';
Expand Down Expand Up @@ -69,7 +69,7 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
public notify<IMessage>(
topic: string,
message: IMessage,
options?: IPublishOptions,
options?: Options.Publish,
): INotifyReply {
this.initializationCheck();
this.rmqNestjsConnectService.publish({
Expand Down Expand Up @@ -130,15 +130,17 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
const consumeFunction = this.rmqMessageTegs.get(route);
let result = { error: ERROR_NO_ROUTE };
if (consumeFunction)
result = await consumeFunction(JSON.parse(message.content.toString()));
result = await consumeFunction(
JSON.parse(message.content.toString()),
message,
);
if (message.properties.replyTo) {
await this.rmqNestjsConnectService.sendToReplyQueue({
replyTo: message.properties.replyTo,
content: result || { status: 'recived' },
correlationId: message.properties.correlationId,
});
}
this.ack(message);
}

private async listenReplyQueue(
Expand Down

0 comments on commit 4644495

Please sign in to comment.