任务队列
队列是一种强大的设计模式,可帮助您应对常见的应用程序扩展和性能挑战。队列可以帮助您解决的一些问题:
- 平滑处理峰值。可以在任意时间启动资源密集型任务,然后将这些任务添加到队列中,而不是同步执行。让任务进程以受控方式从队列中提取任务。也可以轻松添加新的队列消费者以扩展后端任务处理。
- 分解可能会阻塞 Node.js 事件循环的单一任务。比如用户请求需要像音频转码这样的 CPU 密集型工作,就可以将此任务委托给其他进程,从而释放面向用户的进程以保持响应。
- 提供跨各种服务的可靠通信渠道。例如,您可以在一个进程或服务中排队任务(作业),并在另一个进程或服务中使用它们。在任何流程或服务的作业生命周期中完成、错误或其他状态更改时,您都可以收到通知(通过监听状态事件)。当队列生产者或消费者失败时,它们的状态被保留,并且当节点重新启动时任务处理可以自动重新启动。
Midway 提供了 @midwayjs/bullmq
包作为 BullMQ 之上的抽象/包装器。BullMQ 是 Bull 的下一代实现,提供了更好的性能和更多的功能。该软件包可以轻松地将 BullMQ 以友好的方式集成到您的应用程序中。
BullMQ 使用 Redis 来保存作业数据,在使用 Redis 时,Queue 架构是完全分布式,和平台无关。例如,您可以在一个(或多个)节点(进程)中运行一些 Queue 生产者、消费者,而在其他节点上的运行其他生产者和消费者。
BullMQ 是一个分布式任务管理系统,必须依赖 redis
由于 BullMQ 是 Bull 的升级版,从 v3.20 开始,将由 BullMQ 替代 Bull 组件,如需使用 Bull 组件,请参考 Bull 文档
相关信息:
描述 | |
---|---|
可用于标准项目 | ✅ |
可用于 Serverless | ❌ |
可用于一体化 | ✅ |
包含独立主框架 | ✅ |
包含独立日志 | ✅ |
安装组件
$ npm i @midwayjs/bullmq@3 --save
或者在 package.json
中增加如下依赖后,重新安装。
{
"dependencies": {
"@midwayjs/bullmq": "^3.0.0",
// ...
},
}
使用组件
将 bullmq 组件配置到代码中。
import { Configuration } from '@midwayjs/core';
import * as bullmq from '@midwayjs/bullmq';
@Configuration({
imports: [
// ...
bullmq
]
})
export class MainConfiguration {
//...
}
一些概念
BullMQ 将整个队列分为以下几个部分:
- Queue:队列,管理任务
- Job:每个任务对象,可以对任务进行启停控制
- Worker:任务处理器,实际的逻辑执行部分
- QueueEvents:队列事件,用于监听任务状态变化
- FlowProducer:任务流生产者,用于创建任务依赖关系
基础配置
bullmq 是一个分布式任务管理器,强依赖于 redis,在 config.default.ts
文件中配置。
// src/config/config.default.ts
export default {
// ...
bullmq: {
defaultConnection: {
host: '127.0.0.1',
port: 6379,
},
// 可选,队列前缀
defaultPrefix: '{midway-bullmq}',
},
}
有账号密码情况:
// src/config/config.default.ts
export default {
// ...
bullmq: {
defaultConnection: {
port: 6379,
host: '127.0.0.1',
password: 'foobared',
}
},
}
所有的队列、任务处理器、队列事件、任务流都会复用该配置。
编写任务处理器
使用 @Processor
装饰器装饰一个类,用于快速定义一个任务处理器。
@Processor
装饰器需要传递一个 Queue(队列)的名字,在框架启动时,如果没有名为 test
的队列,则会自动创建。
比如,我们在 src/processor/test.processor.ts
文件中编写如下代码。
import { Processor, IProcessor } from '@midwayjs/bullmq';
@Processor('test')
export class TestProcessor implements IProcessor {
async execute(data: any) {
// 处理任务逻辑
console.log('processing job:', data);
}
}
执行任务
当定义完 Processor 之后,由于并未指定 Processor 如何执行,我们还需要手动执行它。
手动执行任务
import { Configuration, Inject } from '@midwayjs/core';
import * as bullmq from '@midwayjs/bullmq';
@Configuration({
imports: [
bullmq
]
})
export class MainConfiguration {
@Inject()
bullmqFramework: bullmq.Framework;
async onServerReady() {
// 获取 Processor 相关的队列
const testQueue = this.bullmqFramework.getQueue('test');
// 立即执行这个任务
await testQueue?.runJob();
}
}
增加执行参数
我们也可以在执行时,附加一些参数。
@Processor('test')
export class TestProcessor implements IProcessor {
async execute(params) {
// params.name => 'harry'
}
}
// invoke
const testQueue = this.bullmqFramework.getQueue('test');
await testQueue?.runJob({
name: 'harry'
});
任务状态和管理
执行 runJob
后,我们可以获取到一个 Job
对象。
const testQueue = this.bullmqFramework.getQueue('test');
const job = await testQueue?.runJob();
// 更新进度
await job.updateProgress(60);
// 获取进度
const progress = await job.progress;
// => 60
// 获取任务状态
const state = await job.getState();
// state => 'delayed' 延迟状态
// state => 'completed' 完成状态
// state => 'failed' 失败状态
延迟执行
执行任务时,也有一些额外的选项。
比如,延迟 1s 执行。
const testQueue = this.bullmqFramework.getQueue('test');
await testQueue?.runJob({}, { delay: 1000 });
任务重试
BullMQ 支持任务失败重试机制。
const testQueue = this.bullmqFramework.getQueue('test');
await testQueue?.runJob({}, {
attempts: 3, // 最多重试 3 次
backoff: { // 重试策略
type: 'exponential', // 指数退避
delay: 1000 // 初始延迟 1 秒
}
});
任务优先级
可以为任务设置优先级,优先级高的任务会优先执行。
const testQueue = this.bullmqFramework.getQueue('test');
// priority 值越大优先级越高
await testQueue?.runJob({ priority: 1 }, { priority: 3 }); // 高优先级
await testQueue?.runJob({ priority: 2 }, { priority: 2 }); // 中优先级
await testQueue?.runJob({ priority: 3 }, { priority: 1 }); // 低优先级
中间件和错误处理
BullMQ 组件包含可以独立启动的 Framework,有着自己的 App 对象和 Context 结构。
我们可以对 bullmq 的 App 配置独立的中间件和错误过滤器。
@Configuration({
imports: [
bullmq
]
})
export class MainConfiguration {
@App('bullmq')
bullmqApp: bullmq.Application;
async onReady() {
this.bullmqApp.useMiddleware(/*中间件*/);
this.bullmqApp.useFilter(/*过滤器*/);
}
}
上下文
任务处理器执行是在请求作用域中,其有着特殊的 Context 对象结构。
export interface Context extends IMidwayContext {
jobId: string;
job: Job;
token?: string;
from: new (...args) => IProcessor;
}
我们可以直接从 ctx 中访问当前的 Job 对象。
import { Processor, IProcessor, Context } from '@midwayjs/bullmq';
@Processor('test')
export class TestProcessor implements IProcessor {
@Inject()
ctx: Context;
async execute(data: any) {
// ctx.jobId => 当前任务ID
// ctx.job => 当前任务对象
}
}
重复执行的任务
除了手动执行的方式,我们也可以通过 @Processor
装饰器的参数,快速配置任务的重复执行。
import { Processor, IProcessor } from '@midwayjs/bullmq';
import { FORMAT } from '@midwayjs/core';
@Processor('test', {
repeat: {
pattern: FORMAT.CRONTAB.EVERY_PER_5_SECOND
}
})
export class TestProcessor implements IProcessor {
async execute() {
// 每 5 秒执行一次
}
}
高级功能
任务流(Flow Producer)
BullMQ 支持创建任务依赖关系,形成任务流。
const flowProducer = bullmqFramework.createFlowProducer({}, 'test-flow');
// 创建任务流
await flowProducer.add({
name: 'flow-test',
queueName: 'flow-queue-1',
data: { value: 1 },
children: [
{
name: 'child-job',
queueName: 'flow-queue-2',
data: { value: 2 }
}
]
});
队列事件
BullMQ 提供了丰富的事件系统,可以监听任务的各种状态变化。
const eventQueue = bullmqFramework.createQueue('event-queue');
const queueEvents = eventQueue.createQueueEvents();
// 监听任务完成事件
queueEvents.on('completed', ({ jobId }) => {
console.log(`Job ${jobId} completed!`);
});
// 监听任务失败事件
queueEvents.on('failed', ({ jobId, failedReason }) => {
console.log(`Job ${jobId} failed: ${failedReason}`);
});
清理任务历史记录
当开启 Redis 后,默认情况下,bullmq 会记录所有的成功和失败的任务 key,这可能会导致 redis 的 key 暴涨,我们可以配置成功或者失败后清理的选项。
// src/config/config.default.ts
export default {
bullmq: {
defaultQueueOptions: {
defaultJobOptions: {
removeOnComplete: 3, // 成功后只保留最近 3 条记录
removeOnFail: 10, // 失败后只保留最近 10 条记录
}
}
}
}
Redis 集群
bullmq 可以指定 connection 实例,你可以将自己创建的 Redis 实例配置到 defaultConnection
中,这样就可以接入 Redis 集群。
// src/config/config.default.ts
import Redis from 'ioredis';
const clusterOptions = {
enableReadyCheck: false,
retryDelayOnClusterDown: 300,
retryDelayOnFailover: 1000,
retryDelayOnTryAgain: 3000,
slotsRefreshTimeout: 10000,
maxRetriesPerRequest: null
}
const redisClientInstance = new Redis.Cluster([
{
port: 7000,
host: '127.0.0.1'
},
{
port: 7002,
host: '127.0.0.1'
},
], clusterOptions);
export default {
bullmq: {
defaultConnection: redisClientInstance,
defaultPrefix: '{midway-bullmq}',
}
}
组件日志
组件有着自己的日志,默认会将 ctx.logger
记录在 midway-bullmq.log
中。
我们可以单独配置这个 logger 对象。
export default {
midwayLogger: {
clients: {
bullMQLogger: {
fileLogName: 'midway-bullmq.log',
},
},
},
}
这个日志的输出格式,我们也可以单独配置。
export default {
bullmq: {
contextLoggerFormat: info => {
const { jobId, from } = info.ctx;
return `${info.timestamp} ${info.LEVEL} ${info.pid} [${jobId} ${from.name}] ${info.message}`;
},
}
}
BullMQ 原始对象
组件导出了 BullMQ 的原始对象,可以进行更多的操作。
import { BullMQ } from '@midwayjs/bullmq';
你可以通过 BullMQ
对象,获取到 Queue
、Worker
、FlowProducer
等对象定义。
Bull UI
在分布式场景中,我们可以资利用 Bull UI 来简化管理。
和 bull 组件类似,需要独立安装和启用。
$ npm i @midwayjs/bull-board@3 --save
或者在 package.json
中增加如下依赖后,重新安装。
{
"dependencies": {
"@midwayjs/bull-board": "^3.0.0",
// ...
},
}
将 bull-board 组件配置到代码中。
import { Configuration } from '@midwayjs/core';
import * as bullmq from '@midwayjs/bullmq';
import * as bullBoard from '@midwayjs/bull-board';
@Configuration({
imports: [
// ...
bullmq,
bullBoard,
]
})
export class MainConfiguration {
//...
}
默认的访问路径为:http://127.1:7001/ui
。
效果如下:
可以通过配置进行基础路径的修改。
// src/config/config.prod.ts
export default {
// ...
bullBoard: {
basePath: '/ui',
},
}
此外,组件提供了 BullBoardManager
,可以添加动态创建的队列。
import { Configuration, Inject } from '@midwayjs/core';
import * as bullmq from '@midwayjs/bullmq';
import * as bullBoard from '@midwayjs/bull-board';
@Configuration({
imports: [
// ...
bullmq,
bullBoard
]
})
export class MainConfiguration {
@Inject()
bullmqFramework: bullmq.Framework;
@Inject()
bullBoardManager: bullBoard.BullBoardManager;
async onReady() {
const testQueue = this.bullmqFramework.createQueue('test', {
// ...
});
this.bullBoardManager.addQueue(testQueue);
}
}