Skip to content

Commit 2bf3cb5

Browse files
authored
Add RabbitMQ retries (#58)
* add rabbitmq retries * minor version bump on rabbit
1 parent 2284f0f commit 2bf3cb5

10 files changed

Lines changed: 132 additions & 24 deletions

packages/bus-rabbitmq/README.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,20 @@ container.load(new BusRabbitMqModule())
2828

2929
const rabbitConfiguration: RabbitMqTransportConfiguration = {
3030
queueName: 'accounts-application-queue',
31-
connectionString: 'amqp://guest:guest@localhost'
31+
connectionString: 'amqp://guest:guest@localhost',
32+
maxRetries: 5
3233
}
3334
container.bind(BUS_RABBITMQ_SYMBOLS.TransportConfiguration).toConstantValue(rabbitConfiguration)
3435
```
3536

37+
## Configuration Options
38+
39+
The RabbitMQ transport has the following configuration:
40+
41+
* **queueName** *(required)* The name of the service queue to create and read messages from.
42+
* **connectionString** *(required)* An amqp formatted connection string that's used to connect to the RabbitMQ instance
43+
* **maxRetries** *(optional)* The number of attempts to retry failed messages before they're routed to the dead letter queue. *Default: 10*
44+
3645
## Development
3746

3847
Local development can be done with the aid of docker to run the required infrastructure. To do so, run:

packages/bus-rabbitmq/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@node-ts/bus-rabbitmq",
33
"description": "A RabbitMQ transport adapter for @node-ts/bus-core.",
4-
"version": "0.4.1",
4+
"version": "0.5.0",
55
"license": "MIT",
66
"main": "./dist/index.js",
77
"types": "./dist/index.d.ts",

packages/bus-rabbitmq/src/rabbitmq-transport-configuration.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,10 @@ export interface RabbitMqTransportConfiguration {
99
* @example amqp://guest:guest@localhost
1010
*/
1111
connectionString: string
12+
13+
/**
14+
* The maximum number of attempts to retry a failed message before routing it to the dead letter queue.
15+
* @default 10
16+
*/
17+
maxRetries?: number
1218
}

packages/bus-rabbitmq/src/rabbitmq-transport.integration.ts

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
1-
import { RabbitMqTransport } from './rabbitmq-transport'
2-
import { TestContainer, TestEvent, TestCommand, TestCommandHandler } from '../test'
1+
import { RabbitMqTransport, DEFAULT_MAX_RETRIES } from './rabbitmq-transport'
2+
import {
3+
TestContainer,
4+
TestEvent,
5+
TestCommand,
6+
TestCommandHandler,
7+
TestPoisonedMessageHandler,
8+
TestPoisonedMessage,
9+
HANDLE_CHECKER,
10+
HandleChecker
11+
} from '../test'
312
import { BUS_RABBITMQ_INTERNAL_SYMBOLS, BUS_RABBITMQ_SYMBOLS } from './bus-rabbitmq-symbols'
413
import { Connection, Channel, Message as RabbitMqMessage } from 'amqplib'
514
import { TransportMessage, BUS_SYMBOLS, ApplicationBootstrap, Bus } from '@node-ts/bus-core'
@@ -8,14 +17,16 @@ import * as faker from 'faker'
817
import { MessageAttributes } from '@node-ts/bus-messages'
918
import { TestSystemMessage } from '../test/test-system-message'
1019
import { TestSystemMessageHandler } from '../test/test-system-message-handler'
20+
import { Mock, IMock, It, Times } from 'typemoq'
1121

1222
export async function sleep (timeoutMs: number): Promise<void> {
1323
return new Promise(resolve => setTimeout(resolve, timeoutMs))
1424
}
1525

1626
const configuration: RabbitMqTransportConfiguration = {
1727
queueName: 'node-ts/bus-rabbitmq-test',
18-
connectionString: 'amqp://guest:guest@0.0.0.0'
28+
connectionString: 'amqp://guest:guest@0.0.0.0',
29+
maxRetries: 3
1930
}
2031

2132
describe('RabbitMqTransport', () => {
@@ -25,16 +36,20 @@ describe('RabbitMqTransport', () => {
2536
let channel: Channel
2637
let container: TestContainer
2738
let bootstrap: ApplicationBootstrap
39+
let handleChecker: IMock<HandleChecker>
2840

2941
beforeAll(async () => {
42+
handleChecker = Mock.ofType<HandleChecker>()
3043
container = new TestContainer()
44+
container.bind(HANDLE_CHECKER).toConstantValue(handleChecker.object)
3145
container.bind(BUS_RABBITMQ_SYMBOLS.TransportConfiguration).toConstantValue(configuration)
3246
bus = container.get(BUS_SYMBOLS.Bus)
3347
sut = container.get(BUS_SYMBOLS.Transport)
3448

3549
bootstrap = container.get<ApplicationBootstrap>(BUS_SYMBOLS.ApplicationBootstrap)
3650
bootstrap.registerHandler(TestCommandHandler)
3751
bootstrap.registerHandler(TestSystemMessageHandler)
52+
bootstrap.registerHandler(TestPoisonedMessageHandler)
3853

3954
const connectionFactory = container.get<() => Promise<Connection>>(BUS_RABBITMQ_INTERNAL_SYMBOLS.AmqpFactory)
4055
connection = await connectionFactory()
@@ -71,6 +86,35 @@ describe('RabbitMqTransport', () => {
7186
})
7287
})
7388

89+
describe('when retrying a poisoned message', () => {
90+
const poisonedMessage = new TestPoisonedMessage(faker.random.uuid())
91+
beforeAll(async () => {
92+
jest.setTimeout(10000)
93+
await bus.publish(poisonedMessage)
94+
await new Promise<void>(resolve => {
95+
channel.consume('dead-letter', msg => {
96+
if (!msg) {
97+
return
98+
}
99+
100+
channel.ack(msg)
101+
102+
const message = JSON.parse(msg.content.toString()) as TestPoisonedMessage
103+
if (message.id === poisonedMessage.id) {
104+
resolve()
105+
}
106+
})
107+
})
108+
})
109+
110+
it(`it should fail after configuration.maxRetries attempts`, () => {
111+
handleChecker.verify(
112+
h => h.check(It.is<TestPoisonedMessage>(m => m.id === poisonedMessage.id), It.isAny()),
113+
Times.exactly(configuration.maxRetries!)
114+
)
115+
})
116+
})
117+
74118
describe('when sending a command', () => {
75119
const command = new TestCommand()
76120
beforeEach(async () => {
@@ -91,23 +135,21 @@ describe('RabbitMqTransport', () => {
91135
const command = new TestSystemMessage()
92136
const channelName = command.name
93137

94-
beforeEach(async () => {
95-
await channel.assertExchange(command.name, 'fanout')
96-
await channel.bindQueue(configuration.queueName, command.name, '')
138+
beforeAll(async () => {
139+
jest.setTimeout(10000)
140+
channel.publish(channelName, '', Buffer.from(JSON.stringify(command)))
141+
await sleep(5000)
97142
})
98143

99-
afterEach(async () => {
144+
afterAll(async () => {
100145
await channel.deleteExchange(command.name)
101146
})
102147

103148
it('should handle system messages', async () => {
104-
channel.publish(channelName, '', Buffer.from(JSON.stringify(command)))
105-
jest.setTimeout(10000)
106-
await sleep(5000)
107-
const message = await sut.readNextMessage()
108-
109-
expect(message).toBeDefined()
110-
expect(message!.domainMessage).toMatchObject(command)
149+
handleChecker.verify(
150+
h => h.check(It.is<TestSystemMessage>(m => m.name === command.name), It.isAny()),
151+
Times.once()
152+
)
111153
})
112154
})
113155

packages/bus-rabbitmq/src/rabbitmq-transport.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { LOGGER_SYMBOLS, Logger } from '@node-ts/logger-core'
77
import { RabbitMqTransportConfiguration } from './rabbitmq-transport-configuration'
88
import { MessageType } from '@node-ts/bus-core/dist/handler/handler'
99

10+
export const DEFAULT_MAX_RETRIES = 10
1011
const deadLetterExchange = '@node-ts/bus-rabbitmq/dead-letter-exchange'
1112
const deadLetterQueue = 'dead-letter'
1213

@@ -19,6 +20,7 @@ export class RabbitMqTransport implements Transport<RabbitMqMessage> {
1920
private connection: Connection
2021
private channel: Channel
2122
private assertedExchanges: { [key: string]: boolean } = {}
23+
private maxRetries: number
2224

2325
constructor (
2426
@inject(BUS_RABBITMQ_INTERNAL_SYMBOLS.AmqpFactory)
@@ -27,6 +29,7 @@ export class RabbitMqTransport implements Transport<RabbitMqMessage> {
2729
private readonly configuration: RabbitMqTransportConfiguration,
2830
@inject(LOGGER_SYMBOLS.Logger) private readonly logger: Logger
2931
) {
32+
this.maxRetries = configuration.maxRetries || DEFAULT_MAX_RETRIES
3033
}
3134

3235
async initialize (handlerRegistry: HandlerRegistry): Promise<void> {
@@ -90,8 +93,16 @@ export class RabbitMqTransport implements Transport<RabbitMqMessage> {
9093
}
9194

9295
async returnMessage (message: TransportMessage<RabbitMqMessage>): Promise<void> {
93-
this.logger.debug('Returning message', { rawMessage: message.raw })
94-
this.channel.nack(message.raw)
96+
const msg = JSON.parse(message.raw.content.toString())
97+
const attempt = message.raw.fields.deliveryTag
98+
const meta = { attempt, message: msg, rawMessage: message.raw }
99+
if (attempt >= this.maxRetries) {
100+
this.logger.debug('Message retries failed, sending to dead letter queue', meta)
101+
this.channel.reject(message.raw, false)
102+
} else {
103+
this.logger.debug('Returning message', meta)
104+
this.channel.nack(message.raw)
105+
}
95106
}
96107

97108
private async assertExchange (messageName: string): Promise<void> {
@@ -104,7 +115,10 @@ export class RabbitMqTransport implements Transport<RabbitMqMessage> {
104115

105116
private async bindExchangesToQueue (handlerRegistry: HandlerRegistry): Promise<void> {
106117
await this.createDeadLetterQueue()
107-
await this.channel.assertQueue(this.configuration.queueName, { durable: true, deadLetterExchange })
118+
await this.channel.assertQueue(
119+
this.configuration.queueName,
120+
{ durable: true, deadLetterExchange }
121+
)
108122
const subscriptionPromises = handlerRegistry.messageSubscriptions
109123
.map(async subscription => {
110124

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import { MessageAttributes } from '@node-ts/bus-messages'
2+
3+
export const HANDLE_CHECKER = Symbol.for('node-ts/bus-rabbitmq/integration/handle-checker')
4+
export interface HandleChecker {
5+
check<T extends object> (message: T, attributes: MessageAttributes): void
6+
}

packages/bus-rabbitmq/test/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@ export * from './test-container'
22
export * from './test-event'
33
export * from './test-command'
44
export * from './test-command-handler'
5+
export * from './test-poisoned-message'
6+
export * from './test-poisoned-message-handler'
7+
export * from './handler-checker'
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { HandlesMessage } from '@node-ts/bus-core'
2+
import { TestPoisonedMessage } from './test-poisoned-message'
3+
import { HANDLE_CHECKER, HandleChecker } from './handler-checker'
4+
import { inject } from 'inversify'
5+
import { MessageAttributes } from '@node-ts/bus-messages'
6+
7+
@HandlesMessage(TestPoisonedMessage)
8+
export class TestPoisonedMessageHandler {
9+
10+
constructor (
11+
@inject(HANDLE_CHECKER) private readonly handleChecker: HandleChecker
12+
) {
13+
}
14+
15+
async handle (message: TestPoisonedMessage, messageAttributes: MessageAttributes): Promise<void> {
16+
this.handleChecker.check(message, messageAttributes)
17+
throw new Error('This will be routed to the DLQ after the configuration.messageTtl retries have been exhausted')
18+
}
19+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { Message } from '@node-ts/bus-messages'
2+
3+
export class TestPoisonedMessage extends Message {
4+
static NAME = '@node-ts/bus-core/test-poisoned-message'
5+
$name = TestPoisonedMessage.NAME
6+
$version = 1
7+
8+
constructor (
9+
readonly id: string
10+
) {
11+
super()
12+
}
13+
}

packages/bus-rabbitmq/test/test-system-message-handler.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,7 @@ import { HandlesMessage } from '@node-ts/bus-core'
22
import { MessageAttributes } from '@node-ts/bus-messages'
33
import { inject } from 'inversify'
44
import { TestSystemMessage } from './test-system-message'
5-
6-
export const HANDLE_CHECKER = Symbol.for('node-ts/bus-rabbitmq/integration/handle-checker')
7-
export interface HandleChecker {
8-
check<T extends Object> (message: T, attributes: MessageAttributes): void
9-
}
5+
import { HANDLE_CHECKER, HandleChecker } from './handler-checker'
106

117
@HandlesMessage(
128
(m: TestSystemMessage) => m.name === TestSystemMessage.NAME,

0 commit comments

Comments
 (0)