Skip to content

dollarsignteam/nestjs-amqp

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

NestJS AMQP 1.0

AMQP 1.0 module for Nest

GitHub Workflow Status npm (scoped) GitHub license

It is based on the rhea-promise package and inspired by nest-amqp package.

Features

  • Multiple connection
  • Multiple consumer
  • Concurrency

Installation

Yarn

yarn add @dollarsign/nestjs-amqp

NPM

npm install --save @dollarsign/nestjs-amqp

Usage

Connection URI

protocol://[username:password@]host:port

Create connection

import { AMQPModule } from '@dollarsign/nestjs-amqp';
import { Module } from '@nestjs/common';

import { AppController } from './app.controller';
import { AppService } from './app.service';

@Module({
  imports: [
    AMQPModule.forRoot({
      connectionUri: 'amqp://admin:admin@localhost:5671',
    }),
    AMQPModule.forRootAsync({
      name: 'custom',
      useFactory: () => {
        return {
          connectionOptions: {
            hostname: 'localhost',
            port: 5672,
            username: 'admin',
            password: 'admin',
            reconnect: true,
          },
        };
      },
    }),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Lifecycle hook enableShutdownHooks method

async function bootstrap(): Promise<void> {
  const app = await NestFactory.create(AppModule, { logger });
  const port = process.env.APP_PORT || 3000;
  app.enableShutdownHooks();
  await app.listen(port);
}

Send and Receive message

import { Logger } from '@dollarsign/logger';
import { Consumer, MessageControl, ProducerService, SendOptions } from '@dollarsign/nestjs-amqp';
import { delay } from '@dollarsign/utils';
import { Injectable } from '@nestjs/common';

import { SimpleMessage } from './interfaces';

@Injectable()
export class AppService {
  private readonly delayTime = 2000;
  private readonly logger = new Logger({
    name: AppService.name,
    displayFilePath: false,
  });

  constructor(private readonly producer: ProducerService) {}

  getHello(): string {
    return 'Hello World!';
  }

  async sendMessage(): Promise<string> {
    const body = { timestamp: new Date().toISOString() };
    const result = await this.producer.send<SimpleMessage>('demo1', body);
    const status = result.status ? 'success' : 'failed';
    return `Send to demo1 of default connection: ${status}`;
  }

  getRandomGroupId(): string {
    const index = Math.floor(Math.random() * 100) % 2;
    const groups = ['GroupA', 'GroupB'];
    return groups[index];
  }

  async sendMessageWithOptions(): Promise<string> {
    const messageId = new Date().getTime();
    const groupId = this.getRandomGroupId();
    const options: SendOptions = {
      connectionName: 'custom',
      group_id: groupId,
      correlation_id: `GROUP:${groupId}`,
      message_id: messageId,
      message_annotations: {
        JMSMessageID: 'A',
      },
    };
    const body = { timestamp: new Date().toISOString() };
    const result = await this.producer.send<SimpleMessage>('demo2', body, options);
    const status = result.status ? 'success' : 'failed';
    return `Send to demo2 of custom connection: ${status}`;
  }

  async sendError(): Promise<string> {
    const message = { timestamp: new Date().toISOString() };
    const result = await this.producer.send<SimpleMessage>('demo3', message);
    const status = result.status ? 'success' : 'failed';
    return `Send to demo3 of default connection: ${status}`;
  }

  @Consumer('demo1')
  async receiveMessage(body: SimpleMessage): Promise<void> {
    this.logger.info('Received from demo1', body);
    await delay(this.delayTime);
  }

  @Consumer('demo2', { connectionName: 'custom', concurrency: 2 })
  async receiveMessageWithOptions(body: SimpleMessage, control: MessageControl): Promise<void> {
    const { message_id, group_id } = control.message;
    this.logger.info(`Received from demo2 id: ${message_id}, ${group_id}`, body);
    await delay(this.delayTime);
    control.accept();
  }

  @Consumer('demo3')
  async receiveError(body: SimpleMessage): Promise<void> {
    this.logger.info('Received from demo3', body);
    await delay(this.delayTime);
    throw new Error(`Created at ${body.timestamp}`);
  }
}

Consumer options

export interface ConsumerOptions {
  // consumer count
  concurrency?: number;
  // connection name
  connectionName?: string;
  // number of messages to be processed at one time of each consumer
  parallelMessageProcessing?: number;
}

Message control

// accept the message
control.accept();

// reject the message
control.reject('Processing failed');

// release the message
control.release();

// get context
const context = control.context;

// get message
const context = control.message;

Contributing

Contributions welcome! See Contributing.

Author

Dollarsign

License

Licensed under the MIT License - see the LICENSE file for details.