跳到主要内容
版本:3.0.0

任务队列

队列是一种强大的设计模式,可帮助您应对常见的应用程序扩展和性能挑战。队列可以帮助您解决的一些问题。

示例如下:

  • 平滑处理峰值。可以在任意时间启动资源密集型任务,然后将这些任务添加到队列中,而不是同步执行。让任务进程以受控方式从队列中提取任务。也可以轻松添加新的队列消费者以扩展后端任务处理。
  • 分解可能会阻塞 Node.js 事件循环的单一任务。比如用户请求需要像音频转码这样的 CPU 密集型工作,就可以将此任务委托给其他进程,从而释放面向用户的进程以保持响应。
  • 提供跨各种服务的可靠通信渠道。例如,您可以在一个进程或服务中排队任务(作业),并在另一个进程或服务中使用它们。在任何流程或服务的作业生命周期中完成、错误或其他状态更改时,您都可以收到通知(通过监听状态事件)。当队列生产者或消费者失败时,它们的状态被保留,并且当节点重新启动时任务处理可以自动重新启动。

Midway 提供了 @midwayjs/bull 包作为 Bull 之上的抽象/包装器,Bull 是一种流行的、受良好支持的、高性能的基于 Node.js 的队列系统实现。该软件包可以轻松地将 Bull Queues 以友好的方式集成到您的应用程序中。

Bull 使用 Redis 来保存作业数据,在使用 Redis 时,Queue 架构是完全分布式,和平台无关。例如,您可以在一个(或多个)节点(进程)中运行一些 Queue 生产者、消费者,而在其他节点上的运行其他生产者和消费者。

本章介绍 @midwayjs/bull 包。我们还建议阅读 Bull 文档 以了解更多背景和具体实施细节。

提示
  • 1、从 v3.6.0 开始,原有任务调度 @midwayjs/task 模块废弃,如果查询历史文档,请参考 这里
  • 2、bull 是一个分布式任务管理系统,必须依赖 redis

相关信息:

描述
可用于标准项目
可用于 Serverless
可用于一体化
包含独立主框架
包含独立日志

安装组件

$ npm i @midwayjs/bull@3 --save

或者在 package.json 中增加如下依赖后,重新安装。

{
"dependencies": {
"@midwayjs/bull": "^3.0.0",
// ...
},
}

使用组件

将 bull 组件配置到代码中。

import { Configuration } from '@midwayjs/core';
import * as bull from '@midwayjs/bull';

@Configuration({
imports: [
// ...
bull
]
})
export class MainConfiguration {
//...
}

一些概念

Bull 将整个队列分为三个部分

  • 1、Queue 队列,管理任务
  • 2、Job,每个任务对象,可以对任务进行启停控制
  • 3、Processor,任务处理,实际的逻辑执行部分

基础配置

bull 是一个分布式任务管理器,强依赖于 redis,在 config.default.ts 文件中配置。

// src/config/config.default.ts
export default {
// ...
bull: {
// 默认的队列配置
defaultQueueOptions: {
redis: `redis://127.0.0.1:32768`,
}
},
}

有账号密码情况:

// src/config/config.default.ts
export default {
// ...
bull: {
defaultQueueOptions: {
redis: {
port: 6379,
host: '127.0.0.1',
password: 'foobared',
},
}
},
}

所有的队列都会复用该配置。

编写任务处理器

使用 @Processor 装饰器装饰一个类,用于快速定义一个任务处理器(这里我们不使用 Job,避免后续的歧义)。

@Processor 装饰器需要传递一个 Queue (队列)的名字,在框架启动时,如果没有名为 test 的队列,则会自动创建。

比如,我们在 src/queue/test.queue.ts 文件中编写如下代码。

// src/queue/test.queue.ts
import { Processor, IProcessor } from '@midwayjs/bull';

@Processor('test')
export class TestProcessor implements IProcessor {
async execute() {
// ...
}
}

在启动时,框架会自动查找并初始化上述处理器代码,同时自动创建一个名为 test 的 Queue。

执行任务

当定义完 Processor 之后,由于并未指定 Processor 如何执行,我们还需要手动执行它。

通过获取对应的队列,我们可以很方便的来执行任务。

手动执行任务

比如,我们可以在项目启动后执行。

import { Configuration, Inject } from '@midwayjs/core';
import * as bull from '@midwayjs/bull';

@Configuration({
imports: [
// ...
bull
]
})
export class MainConfiguration {

@Inject()
bullFramework: bull.Framework;

//...

async onServerReady() {
// 获取 Processor 相关的队列
const testQueue = this.bullFramework.getQueue('test');
// 立即执行这个任务
await testQueue?.runJob();
}
}

增加执行参数

我们也可以在执行时,附加一些默认参数。

@Processor('test')
export class TestProcessor implements IProcessor {
async execute(params) {
// params.aaa => 1
}
}


// invoke
const testQueue = this.bullFramework.getQueue('test');
// 立即执行这个任务
await testQueue?.runJob({
aaa: 1,
bbb: 2,
});

任务状态和管理

执行 runJob 后,我们可以获取到一个 Job 对象。

// invoke
const testQueue = this.bullFramework.getQueue('test');
const job = await testQueue?.runJob();

通过这个 Job 对象,我们可以做进度管理。

// 更新进度
await job.progress(60);
// 获取进度
const progress = await job.process();
// => 60

获取任务状态。

const state = await job.getState();
// state => 'delayed' 延迟状态
// state => 'completed' 完成状态

更多的 Job API,请查看 文档

延迟执行

执行任务时,也有一些额外的选项。

比如,延迟 1s 执行。

const testQueue = this.bullFramework.getQueue('test');
// 立即执行这个任务
await testQueue?.runJob({}, { delay: 1000 });

中间件和错误处理

Bull 组件包含可以独立启动的 Framework,有着自己的 App 对象和 Context 结构。

我们可以对 bull 的 App 配置独立的中间件和错误过滤器。

@Configuration({
imports: [
// ...
bull
]
})
export class MainConfiguration {

@App('bull')
bullApp: bull.Application;

//...

async onReady() {
this.bullApp.useMiddleare( /*中间件*/);
this.bullApp.useFilter( /*过滤器*/);
}
}

上下文

任务处理器执行是在请求作用域中,其有着特殊的 Context 对象结构。

export interface Context extends IMidwayContext {
jobId: JobId;
job: Job,
from: new (...args) => IProcessor;
}

我们可以直接从 ctx 中访问当前的 Job 对象。

// src/queue/test.queue.ts
import { Processor, IProcessor, Context } from '@midwayjs/bull';

@Processor('test')
export class TestProcessor implements IProcessor {

@Inject()
ctx: Context;

async execute() {
// ctx.jobId => xxxx
}
}

更多任务选项

除了上面的 delay 之外,还有更多的执行选项。

选项类型描述
prioritynumber可选的优先级值。范围从 1(最高优先级)到 MAX_INT(最低优先级)。请注意,使用优先级对性能有轻微影响,因此请谨慎使用。
delaynumber等待可以处理此作业的时间量(毫秒)。请注意,为了获得准确的延迟,服务器和客户端都应该同步它们的时钟。
attemptsnumber在任务完成之前尝试尝试的总次数。
repeatRepeatOpts根据 cron 规范的重复任务配置,更多可以查看 RepeatOpts,以及下面的重复任务介绍。
backoffnumber | BackoffOpts任务失败时自动重试的回退设置。请参阅 BackoffOpts
lifoboolean如果为 true,则将任务添加到队列的右端而不是左端(默认为 false)。
timeoutnumber任务因超时错误而失败的毫秒数。
jobIdnumber | string覆盖任务 id - 默认情况下,任务 id 是唯一整数,但您可以使用此设置覆盖它。如果您使用此选项,则由您来确保 jobId 是唯一的。如果您尝试添加一个 id 已经存在的任务,它将不会被添加。
removeOnCompleteboolean | number如果为 true,则在成功完成后删除任务。如果设置数字,则为指定要保留的任务数量。默认行为是任务信息保留在已完成列表中。
removeOnFailboolean | number如果为 true,则在所有尝试后都失败时删除任务。如果设置数字,指定要保留的任务数量。默认行为是将任务信息保留在失败列表中。
stackTraceLimitnumber限制将在堆栈跟踪中记录的堆栈跟踪行的数量。

重复执行的任务

除了手动执行的方式,我们也可以通过 @Processor 装饰器的参数,快速配置任务的重复执行。

import { Processor, IProcessor } from '@midwayjs/bull';
import { FORMAT } from '@midwayjs/core';

@Processor('test', {
repeat: {
cron: FORMAT.CRONTAB.EVERY_PER_5_SECOND
}
})
export class TestProcessor implements IProcessor {
@Inject()
logger;

async execute() {
// ...
}
}

常用 Cron 表达式

关于 Cron 表达式,格式如下。

*    *    *    *    *    *
┬ ┬ ┬ ┬ ┬ ┬
│ │ │ │ │ |
│ │ │ │ │ └ day of week (0 - 7) (0 or 7 is Sun)
│ │ │ │ └───── month (1 - 12)
│ │ │ └────────── day of month (1 - 31)
│ │ └─────────────── hour (0 - 23)
│ └──────────────────── minute (0 - 59)
└───────────────────────── second (0 - 59, optional)

常见表达式:

  • 每隔5秒执行一次:*/5 * * * * *
  • 每隔1分钟执行一次:0 */1 * * * *
  • 每小时的20分执行一次:0 20 * * * *
  • 每天 0 点执行一次:0 0 0 * * *
  • 每天的两点35分执行一次:0 35 2 * * *

可以使用 在线工具 执行确认下一次执行的时间。

Midway 在框架侧提供了一些常用的表达式,放在 @midwayjs/core 中供大家使用。

import { FORMAT } from '@midwayjs/core';

// 每分钟执行的 cron 表达式
FORMAT.CRONTAB.EVERY_MINUTE

内置的还有一些其他的表达式。

表达式对应时间
CRONTAB.EVERY_SECOND每秒钟
CRONTAB.EVERY_MINUTE每分钟
CRONTAB.EVERY_HOUR每小时整点
CRONTAB.EVERY_DAY每天 0 点
CRONTAB.EVERY_DAY_ZERO_FIFTEEN每天 0 点 15 分
CRONTAB.EVERY_DAY_ONE_FIFTEEN每天 1 点 15 分
CRONTAB.EVERY_PER_5_SECOND每隔 5 秒
CRONTAB.EVERY_PER_10_SECOND每隔 10 秒
CRONTAB.EVERY_PER_30_SECOND每隔 30 秒
CRONTAB.EVERY_PER_5_MINUTE每隔 5 分钟
CRONTAB.EVERY_PER_10_MINUTE每隔 10 分钟
CRONTAB.EVERY_PER_30_MINUTE每隔 30 分钟

高级配置

清理之前的任务

在默认情况下,框架会自动清理前一次未调度的 重复执行任务,保持每一次的重复执行的任务队列为最新。如果在某些环境不需要清理,可以单独关闭。

比如你不需要清理重复:

// src/config/config.prod.ts
export default {
// ...
bull: {
clearRepeatJobWhenStart: false,
},
}
提示

如果不清理,如果前一次队列为 10s 执行,现在修改为 20s 执行,则两个定时都会存储在 Redis 中,导致代码重复执行。

在日常的开发中,如果不清理,很容易出现代码重复执行这个问题。但是在集群部署的场景,多台服务器轮流重启的情况下,可能会导致定时任务被意外清理,请评估开关的时机。

也可以在启动时手动清理所有任务。

// src/configuration.ts
import { Configuration, App, Inject } from '@midwayjs/core';
import * as koa from '@midwayjs/koa';
import { join } from 'path';
import * as bull from '@midwayjs/bull';

@Configuration({
imports: [koa, bull],
importConfigs: [join(__dirname, './config')],
})
export class MainConfiguration {
@App()
app: koa.Application;

@Inject()
bullFramework: bull.Framework;

async onReady() {
// 在这个阶段,装饰器队列还未创建,使用 API 提前手动创建队列,装饰器会复用同名队列
const queue = this.bullFramework.createQueue('user');
// 通过队列手动执行清理
await queue.obliterate({ force: true });
}
}

清理任务历史记录

当开启 Redis 后,默认情况下,bull 会记录所有的成功和失败的任务 key,这可能会导致 redis 的 key 暴涨,我们可以配置成功或者失败后清理的选项。

默认情况下

  • 成功时保留的任务记录为 3 条
  • 失败保留的任务记录为 10 条

也可以通过参数进行配置。

比如在装饰器配置。

import { FORMAT } from '@midwayjs/core';
import { IProcessor, Processor } from '@midwayjs/bull';

@Processor('user', {
repeat: {
cron: FORMAT.CRONTAB.EVERY_MINUTE,
},
removeOnComplete: 3, // 成功后移除任务记录,最多保留最近 3 条记录
removeOnFail: 10, // 失败后移除任务记录
})
export class UserService implements IProcessor {
execute(data: any) {
// ...
}
}

也可以在全局 config 中配置。

// src/config/config.default.ts
export default {
// ...
bull: {
defaultQueueOptions: {
// 默认的任务配置
defaultJobOptions: {
// 保留 10 条记录
removeOnComplete: 10,
},
},
},
}

Redis 集群

可以使用 bull 提供的 createClient 方式来接入自定义的 redis 实例,这样你可以接入 Redis 集群。

比如:

// src/config/config.default
import Redis from 'ioredis';

const clusterOptions = {
enableReadyCheck: false, // 一定要是false
retryDelayOnClusterDown: 300,
retryDelayOnFailover: 1000,
retryDelayOnTryAgain: 3000,
slotsRefreshTimeout: 10000,
maxRetriesPerRequest: null // 一定要是null
}

const redisClientInstance = new Redis.Cluster([
{
port: 7000,
host: '127.0.0.1'
},
{
port: 7002,
host: '127.0.0.1'
},
], clusterOptions);

export default {
bull: {
defaultQueueOptions: {
createClient: (type, opts) => {
return redisClientInstance;
},
// 这些任务存储的 key,都是相同开头,以便区分用户原有 redis 里面的配置
prefix: '{midway-bull}',
},
}
}

队列管理

队列是廉价的,每个 Job 都会绑定一个队列,在一些情况下,我们也可以手动对队列进行管理操作。

手动创建队列

除了使用 @Processor 简单定义队列,我们还可以使用 API 进行创建。

import { Configuration, Inject } from '@midwayjs/core';
import * as bull from '@midwayjs/bull';

@Configuration({
imports: [
// ...
bull
]
})
export class MainConfiguration {

@Inject()
bullFramework: bull.Framework;

async onReady() {
const testQueue = this.bullFramework.createQueue('test', {
redis: {
port: 6379,
host: '127.0.0.1',
password: 'foobared',
},
prefix: '{midway-bull}',
});

// ...
}
}

通过 createQueue 手动创建队列后,队列依旧会自动保存。如果在启动时 @Processor 使用了该队列名,则会自动使用已经创建好的队列。

比如:

// 会自动使用上面手动创建的同名队列
@Processor('test')
export class TestProcessor implements IProcessor {
async execute(params) {
}
}

获取队列

我们可以简单的根据队列名获取队列。

 const testQueue = bullFramework.getQueue('test');

也可以通过装饰器来获取。

import { InjectQueue, BullQueue } from '@midwayjs/bull';
import { Provide } from '@midwayjs/core';

@Provide()
export class UserService {
@InjectQueue('test')
testQueue: BullQueue;

async invoke() {
await this.testQueue.pause();
// ...
}
}

队列常用操作

暂停队列。

await testQueue.pause();

继续队列。

await testQueue.resume();

队列事件。

// Local events pass the job instance...
testQueue.on('progress', function (job, progress) {
console.log(`Job ${job.id} is ${progress * 100}% ready!`);
});

testQueue.on('completed', function (job, result) {
console.log(`Job ${job.id} completed! Result: ${result}`);
job.remove();
});

完整队列 API 请参考 这里

组件日志

组件有着自己的日志,默认会将 ctx.logger 记录在 midway-bull.log 中。

我们可以单独配置这个 logger 对象。

export default {
midwayLogger: {
clients: {
// ...
bullLogger: {
fileLogName: 'midway-bull.log',
},
},
},
}

这个日志的输出格式,我们也可以单独配置。

export default {
bull: {
// ...
contextLoggerFormat: info => {
const { jobId, from } = info.ctx;
return `${info.timestamp} ${info.LEVEL} ${info.pid} [${jobId} ${from.name}] ${info.message}`;
},
}
}

关于 Redis 版本

请尽可能选择最新的版本( >=5 ),目前在低版本 redis 上有发现定时任务创建失败的问题。

Bull UI

在分布式场景中,我们可以资利用 Bull UI 来简化管理。

和 bull 组件类似,需要独立安装和启用。

$ npm i @midwayjs/bull-board@3 --save

或者在 package.json 中增加如下依赖后,重新安装。

{
"dependencies": {
"@midwayjs/bull-board": "^3.0.0",
// ...
},
}

将 bull-board 组件配置到代码中。

import { Configuration } from '@midwayjs/core';
import * as bull from '@midwayjs/bull';
import * as bullBoard from '@midwayjs/bull-board';

@Configuration({
imports: [
// ...
bull,
bullBoard,
]
})
export class MainConfiguration {
//...
}

默认的访问路径为:http://127.1:7001/ui

效果如下:

可以通过配置进行基础路径的修改。

// src/config/config.prod.ts
export default {
// ...
bullBoard: {
basePath: '/ui',
},
}

此外,组件提供了 BullBoardManager ,可以添加动态创建的队列。

import { Configuration, Inject } from '@midwayjs/core';
import * as bull from '@midwayjs/bull';
import * as bullBoard from '@midwayjs/bull-board';

@Configuration({
imports: [
// ...
bull,
bullBoard
]
})
export class MainConfiguration {

@Inject()
bullFramework: bull.Framework;

@Inject()
bullBoardManager: bullBoard.BullBoardManager;

async onReady() {
const testQueue = this.bullFramework.createQueue('test', {
// ...
});

this.bullBoardManager.addQueue(testQueue);
}
}

常见问题

1、EVALSHA 错误

image.png

这个问题基本明确,问题会出现在 redis 的集群版本上。

原因是 redis 会对 key 做 hash 来确定存储的 slot,集群下这一步 @midwayjs/bull 的 key 命中了不同的 slot。

解决方案: task 里的 prefix 配置用 {} 包括,强制 redis 只计算 {} 里的hash,例如 prefix: '{midway-task}'

2、EVAL inside MULTI is not allowed 错误

表现为 queue.createBulk()job.moveToFailed() 等任务队列 API 调用无效,并出现下面的错误。

ReplyError: EXECABORT Transaction discarded because of previous errors.
at parseError (<project_dir>/node_modules/redis-parser/lib/parser.js:179:12)
at parseType (<project_dir>/node_modules/redis-parser/lib/parser.js:302:14) {
command: { name: 'exec', args: [] },
previousErrors: [
ReplyError: ERR 'EVAL' inside MULTI is not allowed
at parseError (<project_dir>/node_modules/redis-parser/lib/parser.js:179:12)
at parseType (<project_dir>/node_modules/redis-parser/lib/parser.js:302:14) {
command: [Object]
}
]
}
提示

常出现于使用阿里云 Redis 服务。

由于这些 API 依赖的 Redis Lua 脚本中使用了 EVAL 或者 EVALSHA,阿里云 Redis 使用代理模式连接时,会对 Lua 脚本调用做额外限制,包括 不允许在 MULTI 事务中执行 EVAL 命令,文档中还提到可以通过参数配置 script_check_enable 关闭这一校验,但是验证无效。

解决方案:

  • 1、在阿里云控制台操作开启直连地址,将服务切换到直连模式
  • 2、客户端切换成集群模式,参考上述「Redis 集群」章节,切换配置方式