Search…
Message Queue
This is a nest library for using SQS

Quickstart

To use this module locally in dev/testing it's probably easiest to use a SQS compatible server that can run in docker, like Localstack
Simple docker-compose.yml might look something like
1
services:
2
localstack:
3
image: localstack/localstack
4
networks:
5
- local
6
ports:
7
- 4566:4566
8
environment:
9
- SERVICES=sqs
10
11
networks:
12
local:
Copied!
Import the queue module in your nest module
1
import { QueueModule } from '@island.is/message-queue'
2
3
// Register the queue in your nest module.
4
// Note you can call QueueModule.register multiple times with different names
5
// if you need mulitple queues
6
@Module({
7
imports: [
8
QueueModule.register({
9
queue: {
10
// identifier for using the queue
11
name: 'name-of-queue',
12
// name of queue in AWS
13
queueName: process.env.QUEUE_NAME,
14
// you probably want a dead-letter queue
15
// leave options empty for default settings
16
deadLetterQueue: {},
17
},
18
client: {
19
// endpoint should be 'http://localhost:4566' for localstack but can be left undefined in production
20
endpoint: process.env.SQS_ENDPOINT,
21
region: 'eu-west-1',
22
credentials: {
23
// both keys can be set to 'testing' for use with localstack
24
accessKeyId: process.env.SQS_ACCESS_KEY,
25
secretAccessKey: process.env.SQS_SECRET_ACCESS_KEY,
26
},
27
},
28
}),
29
],
30
})
31
export class MyModule {}
Copied!
Push messages to the queue
1
import { InjectQueue, QueueService } from '@island.is/message-queue'
2
import { SomeMessageType } from './types'
3
4
@Injectable()
5
export class SomeService {
6
constructor(@InjectQueue('name-of-queue') private queue: QueueService) {}
7
8
async addToQueue() {
9
// msg can be any json-serializable object
10
const msg: SomeMessageType = { some: 'data' }
11
const uuid = await this.queue.add(msg)
12
// Up to you if or what you use the uuid for
13
}
14
}
Copied!
Consume messages from the queue
1
import { InjectWorker, WorkerService } from '@island.is/message-queue'
2
import { SomeMessageType } from './types'
3
4
@Injectable()
5
export class SomeWorkerService {
6
constructor(@InjectWorker('name-of-queue') private worker: WorkerService) {}
7
8
async run() {
9
await this.worker.run(async (message: SomeMessageType) => {
10
console.log('Yay, got message', message)
11
})
12
}
13
}
Copied!
Run the worker from your application main.ts
1
const worker = async () => {
2
const app = await NestFactory.createApplicationContext(AppModule)
3
app.enableShutdownHooks()
4
await app.get(SomeWorkerService).run()
5
}
6
7
// might need some check here if we should run the worker or the main application/webserver
8
worker()
Copied!

System Flow Sequence Diagram

1
sequenceDiagram
2
autonumber
3
Institution->>Advania:adds new message in mailbox
4
Advania->>User-Notification-Service:xroad:sends message notification
5
User-Notification-Service->>AWS SQS:adds processed notification to queue
6
loop
7
User-Notification-Worker->>AWS SQS:requests 10 notifications
8
AWS SQS->>User-Notification-Worker:responds with 0-10 notifications
9
end
10
loop if worker throws an unexpected error, it will try 2 more times with 10 minute interval
11
User-Notification-Worker->>User-Profile-Service:requests notification settings
12
User-Profile-Service->>User-Notification-Worker:returns user settings and tokens
13
end
14
User-Notification-Worker->>Firebase Cloud Messaging: sends notification
15
Firebase Cloud Messaging->>island.is app:sends notification
Copied!

Running unit tests

Run yarn test message-queue to execute the unit tests via Jest.