Aller au contenu principal
Version: 3.0.0

Task Queue

Queue is a powerful design pattern that can help you handle common application scaling and performance challenges. Here are some problems that queues can help you solve:

  • Smooth out processing peaks. You can start resource-intensive tasks at any time, add them to a queue instead of executing them synchronously. Let task processes pull tasks from the queue in a controlled manner. You can also easily add new queue consumers to scale backend task processing.
  • Break down single tasks that might block the Node.js event loop. For example, if a user request requires CPU-intensive work like audio transcoding, you can delegate this task to other processes, freeing up user-facing processes to remain responsive.
  • Provide reliable communication channels across various services. For example, you can queue tasks (jobs) in one process or service and consume them in another. You can receive notifications (by listening to status events) when jobs complete, fail, or undergo other status changes in any process or service's job lifecycle. When queue producers or consumers fail, their state is preserved, and task processing can automatically restart when nodes restart.

Midway provides the @midwayjs/bullmq package as an abstraction/wrapper on top of BullMQ. BullMQ is the next-generation implementation of Bull, offering better performance and more features. This package makes it easy to integrate BullMQ into your application in a friendly way.

BullMQ uses Redis to store job data. When using Redis, the Queue architecture is completely distributed and platform-independent. For example, you can run some Queue producers and consumers in one (or more) nodes (processes) while running other producers and consumers on other nodes.

astuce

bullmq is a distributed task management system that requires redis

attention

Since BullMQ is a successor to Bull, starting with v3.20, it will replace the Bull component. If you need to use the Bull component, please refer to the Bull documentation.

Related Information:

Description
Available for standard projects
Available for Serverless
Available for Integration
Contains independent main framework
Contains independent logs

Installation

$ npm i @midwayjs/bullmq@3 --save

Or add the following dependency to your package.json and reinstall.

{
"dependencies": {
"@midwayjs/bullmq": "^3.0.0",
// ...
},
}

Using the Component

Configure the bullmq component in your code.

import { Configuration } from '@midwayjs/core';
import * as bullmq from '@midwayjs/bullmq';

@Configuration({
imports: [
// ...
bullmq
]
})
export class MainConfiguration {
//...
}

Core Concepts

BullMQ divides the entire queue system into the following parts:

  • Queue: Manages tasks
  • Job: Each task object that can be controlled (start/stop)
  • Worker: Task processor that executes the actual logic
  • QueueEvents: Queue events for monitoring task status changes
  • FlowProducer: Task flow producer for creating task dependencies

Basic Configuration

bullmq is a distributed task manager that heavily depends on redis. Configure it in the config.default.ts file.

// src/config/config.default.ts
export default {
// ...
bullmq: {
defaultConnection: {
host: '127.0.0.1',
port: 6379,
},
// Optional, queue prefix
defaultPrefix: '{midway-bullmq}',
},
}

With username and password:

// src/config/config.default.ts
export default {
// ...
bullmq: {
defaultConnection: {
port: 6379,
host: '127.0.0.1',
password: 'foobared',
}
},
}

All queues, processors, queue events, and flow producers will reuse this configuration.

Writing Task Processors

Use the @Processor decorator to quickly define a task processor.

The @Processor decorator requires a Queue name. If a queue with the specified name doesn't exist when the framework starts, it will be automatically created.

For example, write the following code in src/processor/test.processor.ts:

import { Processor, IProcessor } from '@midwayjs/bullmq';

@Processor('test')
export class TestProcessor implements IProcessor {
async execute(data: any) {
// Process task logic
console.log('processing job:', data);
}
}

Executing Tasks

After defining a Processor, since we haven't specified how to execute it, we need to run it manually.

Manual Execution

import { Configuration, Inject } from '@midwayjs/core';
import * as bullmq from '@midwayjs/bullmq';

@Configuration({
imports: [
bullmq
]
})
export class MainConfiguration {

@Inject()
bullmqFramework: bullmq.Framework;

async onServerReady() {
// Get the queue associated with the Processor
const testQueue = this.bullmqFramework.getQueue('test');
// Execute the task immediately
await testQueue?.runJob();
}
}

Adding Execution Parameters

We can attach additional parameters when executing tasks.

@Processor('test')
export class TestProcessor implements IProcessor {
async execute(params) {
// params.name => 'harry'
}
}

// invoke
const testQueue = this.bullmqFramework.getQueue('test');
await testQueue?.runJob({
name: 'harry'
});

Task Status and Management

After executing runJob, we get a Job object.

const testQueue = this.bullmqFramework.getQueue('test');
const job = await testQueue?.runJob();

// Update progress
await job.updateProgress(60);
// Get progress
const progress = await job.progress;
// => 60

// Get task status
const state = await job.getState();
// state => 'delayed'
// state => 'completed'
// state => 'failed'

Delayed Execution

Tasks can be executed with additional options.

For example, to delay execution by 1 second:

const testQueue = this.bullmqFramework.getQueue('test');
await testQueue?.runJob({}, { delay: 1000 });

Task Retry

BullMQ supports task failure retry mechanism.

const testQueue = this.bullmqFramework.getQueue('test');
await testQueue?.runJob({}, {
attempts: 3, // Maximum 3 retry attempts
backoff: { // Retry strategy
type: 'exponential', // Exponential backoff
delay: 1000 // Initial delay of 1 second
}
});

Task Priority

Tasks can be assigned priorities, with higher priority tasks executing first.

const testQueue = this.bullmqFramework.getQueue('test');
// Higher priority value means higher priority
await testQueue?.runJob({ priority: 1 }, { priority: 3 }); // High priority
await testQueue?.runJob({ priority: 2 }, { priority: 2 }); // Medium priority
await testQueue?.runJob({ priority: 3 }, { priority: 1 }); // Low priority

Middleware and Error Handling

The BullMQ component includes an independently startable Framework with its own App object and Context structure.

We can configure independent middleware and error filters for the bullmq App.

@Configuration({
imports: [
bullmq
]
})
export class MainConfiguration {

@App('bullmq')
bullmqApp: bullmq.Application;

async onReady() {
this.bullmqApp.useMiddleware(/*middleware*/);
this.bullmqApp.useFilter(/*filter*/);
}
}

Context

Task processors execute in request scope and have a special Context object structure.

export interface Context extends IMidwayContext {
jobId: string;
job: Job;
token?: string;
from: new (...args) => IProcessor;
}

We can access the current Job object directly from ctx.

import { Processor, IProcessor, Context } from '@midwayjs/bullmq';

@Processor('test')
export class TestProcessor implements IProcessor {
@Inject()
ctx: Context;

async execute(data: any) {
// ctx.jobId => current task ID
// ctx.job => current job object
}
}

Repeatable Tasks

Besides manual execution, we can quickly configure repeatable task execution through the @Processor decorator parameters.

import { Processor, IProcessor } from '@midwayjs/bullmq';
import { FORMAT } from '@midwayjs/core';

@Processor('test', {
repeat: {
pattern: FORMAT.CRONTAB.EVERY_PER_5_SECOND
}
})
export class TestProcessor implements IProcessor {
async execute() {
// Executes every 5 seconds
}
}

Advanced Features

Task Flow (Flow Producer)

BullMQ supports creating task dependencies to form task flows.

const flowProducer = bullmqFramework.createFlowProducer({}, 'test-flow');

// Create task flow
await flowProducer.add({
name: 'flow-test',
queueName: 'flow-queue-1',
data: { value: 1 },
children: [
{
name: 'child-job',
queueName: 'flow-queue-2',
data: { value: 2 }
}
]
});

Queue Events

BullMQ provides a rich event system for monitoring various task status changes.

const eventQueue = bullmqFramework.createQueue('event-queue');
const queueEvents = eventQueue.createQueueEvents();

// Listen for task completion
queueEvents.on('completed', ({ jobId }) => {
console.log(`Job ${jobId} completed!`);
});

// Listen for task failure
queueEvents.on('failed', ({ jobId, failedReason }) => {
console.log(`Job ${jobId} failed: ${failedReason}`);
});

Cleaning Task History

When Redis is enabled, bullmq records all successful and failed task keys by default, which may cause Redis keys to grow rapidly. We can configure cleanup options for successful or failed tasks.

// src/config/config.default.ts
export default {
bullmq: {
defaultQueueOptions: {
defaultJobOptions: {
removeOnComplete: 3, // Keep only the last 3 records after success
removeOnFail: 10, // Keep only the last 10 records after failure
}
}
}
}

Redis Cluster

bullmq allows you to specify a connection instance. You can configure your own Redis instance in defaultConnection to connect to a Redis cluster.

// src/config/config.default.ts
import Redis from 'ioredis';

const clusterOptions = {
enableReadyCheck: false,
retryDelayOnClusterDown: 300,
retryDelayOnFailover: 1000,
retryDelayOnTryAgain: 3000,
slotsRefreshTimeout: 10000,
maxRetriesPerRequest: null
}

const redisClientInstance = new Redis.Cluster([
{
port: 7000,
host: '127.0.0.1'
},
{
port: 7002,
host: '127.0.0.1'
},
], clusterOptions);

export default {
bullmq: {
defaultConnection: redisClientInstance,
defaultPrefix: '{midway-bullmq}',
}
}

Component Logging

The component has its own logs, by default recording ctx.logger in midway-bullmq.log.

We can configure this logger object separately.

export default {
midwayLogger: {
clients: {
bullMQLogger: {
fileLogName: 'midway-bullmq.log',
},
},
},
}

We can also configure the log output format separately.

export default {
bullmq: {
contextLoggerFormat: info => {
const { jobId, from } = info.ctx;
return `${info.timestamp} ${info.LEVEL} ${info.pid} [${jobId} ${from.name}] ${info.message}`;
},
}
}

BullMQ Original Objects

The component exports the original BullMQ objects, which can be used for more operations.

import { BullMQ } from '@midwayjs/bullmq';

Through the BullMQ object, you can access object definitions such as Queue, Worker, FlowProducer, etc.

Bull UI

In a distributed scenario, we can leverage the Bull UI to simplify management.

Similar to the bull component, it needs to be installed and enabled independently.

$ npm i @midwayjs/bull-board@3 --save

Or reinstall it after adding the following dependencies to package.json.

{
"dependencies": {
"@midwayjs/bull-board": "^3.0.0",
// ...
},
}

Configure the bull-board component into the code.

import { Configuration } from '@midwayjs/core';
import * as bullmq from '@midwayjs/bullmq';
import * as bullBoard from '@midwayjs/bull-board';

@Configuration({
imports: [
// ...
bullmq,
bullBoard,
]
})
export class MainConfiguration {
//...
}

The default access path is: http://127.1:7001/ui.

The effect is as follows.

The base path can be modified by configuration.

// src/config/config.prod.ts
export default {
// ...
bullBoard: {
basePath: '/ui',
},
}

In addition, the component provides the BullBoardManager class, which can add queues dynamically created.

import { Configuration, Inject } from '@midwayjs/core';
import * as bullmq from '@midwayjs/bullmq';
import * as bullBoard from '@midwayjs/bull-board';

@Configuration({
imports: [
// ...
bullmq,
bullBoard
]
})
export class MainConfiguration {

@Inject()
bullmqFramework: bullmq.Framework;

@Inject()
bullBoardManager: bullBoard.BullBoardManager;

async onReady() {
const testQueue = this.bullmqFramework.createQueue('test', {
// ...
});

this.bullBoardManager.addQueue(testQueue);
}
}