跳到主要内容
版本:3.0.0

Kafka

在复杂系统的架构中,事件流是很重要的一环,包括从事件源中(数据库、传感器、移动设备等)以事件流的方式去实时捕获数据,持久化事件流方便检索,并实时和回顾操作处理响应事件流。

应用于支付和金融交易、实施跟踪和监控汽车等行业信息流动、捕获分析物联网数据等等。

在 Midway中,我们提供了订阅 Kafka 的能力,专门来满足用户的这类需求。

相关信息:

订阅服务

描述
可用于标准项目
可用于 Serverless
可用于一体化
包含独立主框架
包含独立日志

基础概念

分布式流处理平台

  • 发布订阅(流)信息
  • 容错(故障转移)存储信息(流),存储事件流
  • 在消息流发生的时候进行处理,处理事件流

理解 Producer(生产者)

  • 发布消息到一个主题或多个 topic (主题)。

理解 Consumer(主题消费者)

  • 订阅一个或者多个 topic,并处理产生的信息。

理解 Stream API

  • 充当一个流处理器,从 1 个或多个 topic 消费输入流,并生产一个输出流到1个或多个输出 topic,有效地将输入流转换到输出流。

理解 Broker

  • 已发布的消息保存在一组服务器中,称之为 Kafka 集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

image.png

提示

从 v3.19 开始,Kafka 组件做了一次重构,Kafka 组件的配置、使用方法和之前都有较大差异,原有使用方式兼容,但是文档不再保留。

安装依赖

安装 @midwayjs/kafka 模块。

$ npm i @midwayjs/kafka --save

或者在 package.json 中增加如下依赖后,重新安装。

{
"dependencies": {
"@midwayjs/kafka": "^3.0.0",
// ...
}
}

开启组件

@midwayjs/kafka 可以作为独立主框架使用。

// src/configuration.ts
import { Configuration } from '@midwayjs/core';
import * as kafka from '@midwayjs/kafka';

@Configuration({
imports: [
kafka
],
// ...
})
export class MainConfiguration {
async onReady() {
// ...
}
}

也可以附加在其他的主框架下,比如 @midwayjs/koa

// src/configuration.ts
import { Configuration } from '@midwayjs/core';
import * as koa from '@midwayjs/koa';
import * as kafka from '@midwayjs/kafka';

@Configuration({
imports: [
koa,
kafka
],
// ...
})
export class MainConfiguration {
async onReady() {
// ...
}
}

由于 Kafka 分为 消费者(Consumer)生产者(Producer) 两部分,两个可以独立使用,我们将分别介绍。

消费者(Consumer)

目录结构

我们一般把消费者放在 consumer 目录。比如 src/consumer/user.consumer.ts

➜  my_midway_app tree
.
├── src
│ ├── consumer
│ │ └── user.consumer.ts
│ ├── interface.ts
│ └── service
│ └── user.service.ts
├── test
├── package.json
└── tsconfig.json

基础配置

通过 consumer 字段和 @KafkaConsumer 装饰器,我们可以配置多个消费者。

比如,下面的 sub1sub2 就是两个不同的消费者。

// src/config/config.default
export default {
kafka: {
consumer: {
sub1: {
// ...
},
sub2: {
// ...
},
}
}
}

最简单的消费者配置需要几个字段,Kafka 的连接配置、消费者配置以及订阅配置。

// src/config/config.default
export default {
kafka: {
consumer: {
sub1: {
connectionOptions: {
// ...
},
consumerOptions: {
// ...
},
subscribeOptions: {
// ...
},
},
}
}
}

比如:

// src/config/config.default
export default {
kafka: {
consumer: {
sub1: {
connectionOptions: {
clientId: 'my-app',
brokers: ['localhost:9092'],
},
consumerOptions: {
groupId: 'groupId-test-1',
},
subscribeOptions: {
topics: ['topic-test-1'],
}
},
}
}
}

完整可配置参数包括:

  • connectionOptions:Kafka 的连接配置,即 new Kafka(consumerOptions) 的参数
  • consumerOptions:Kafka 的消费者配置,即 kafka.consumer(consumerOptions) 的参数
  • subscribeOptions:Kafka 的订阅配置,即 consumer.subscribe(subscribeOptions) 的参数
  • consumerRunConfig:消费者运行配置,即 consumer.run(consumerRunConfig) 的参数

这些参数的详细说明,可以参考 KafkaJS Consumer 文档。

复用 Kafka 实例

如果如果需要复用 Kafka 实例,可以通过 kafkaInstanceRef 字段来指定。

// src/config/config.default
export default {
kafka: {
consumer: {
sub1: {
connectionOptions: {
clientId: 'my-app',
brokers: ['localhost:9092'],
},
consumerOptions: {
groupId: 'groupId-test-1',
},
subscribeOptions: {
topics: ['topic-test-1'],
}
},
sub2: {
kafkaInstanceRef: 'sub1',
consumerOptions: {
groupId: 'groupId-test-2',
},
subscribeOptions: {
topics: ['topic-test-2'],
}
}
}
}
}

注意,上述的 sub1sub2 是两个不同的消费者,但是它们共享同一个 Kafka 实例,且 sub2groupId 需要和 sub1 不同。

用 Kafka SDK 写法类似如下:

const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
});

const consumer1 = kafka.consumer({ groupId: 'groupId-test-1' });
const consumer2 = kafka.consumer({ groupId: 'groupId-test-2' });

消费者实现

我们可以在目录中提供一个标准的消费者实现,比如 src/consumer/sub1.consumer.ts

// src/consumer/sub1.consumer.ts
import { KafkaConsumer, IKafkaConsumer, EachMessagePayload } from '@midwayjs/kafka';

@KafkaConsumer('sub1')
class Sub1Consumer implements IKafkaConsumer {
async eachMessage(payload: EachMessagePayload) {
// ...
}
}

sub1 是消费者名称,使用的是配置中的 sub1 消费者。

也可以实现 eachBatch 方法,处理批量消息。

// src/consumer/sub1.consumer.ts
import { KafkaConsumer, IKafkaConsumer, EachBatchPayload } from '@midwayjs/kafka';

@KafkaConsumer('sub1')
class Sub1Consumer implements IKafkaConsumer {
async eachBatch(payload: EachBatchPayload) {
// ...
}
}

消息上下文

和其他消息订阅机制一样,消息本身通过 Context 字段来传递。

// src/consumer/sub1.consumer.ts
import { KafkaConsumer, IKafkaConsumer, EachMessagePayload, Context } from '@midwayjs/kafka';
import { Inject } from '@midwayjs/core';

@KafkaConsumer('sub1')
class Sub1Consumer implements IKafkaConsumer {

@Inject()
ctx: Context;

async eachMessage(payload: EachMessagePayload) {
// ...
}
}

Context 字段包括几个属性:

属性类型描述
ctx.payloadEachMessagePayload, EachBatchPayload消息内容
ctx.consumerConsumer消费者实例

你可以通过 ctx.consumer 来调用 Kafka 的 API,比如 ctx.consumer.commitOffsets 来手动提交偏移量或者 ctx.consumer.pause 来暂停消费。

生产者(Producer)

基础配置

服务生产者也需要创建实例,配置本身使用了 服务工厂 的设计模式。

配置如下:

// src/config/config.default
export default {
kafka: {
producer: {
clients: {
pub1: {
// ...
},
pub2: {
// ...
}
}
}
}
}

每个 Producer 实例的配置,同样包括 connectionOptionsproducerOptions

// src/config/config.default
export default {
kafka: {
producer: {
clients: {
pub1: {
connectionOptions: {
clientId: 'my-app',
brokers: ['localhost:9092'],
},
producerOptions: {
// ...
}
}
}
}
}
}

具体参数可以参考 KafkaJS Producer 文档。

此外,由于 Kafka Consumer 和 Producer 都可以从同一个 Kafka 实例创建,所以它们可以复用同一个 Kafka 实例。

Producer 后于 Consumer 创建,也同样可以使用 kafkaInstanceRef 字段来复用 Kafka 实例。

// src/config/config.default
export default {
kafka: {
consumer: {
sub1: {
connectionOptions: {
clientId: 'my-app',
brokers: ['localhost:9092'],
},
}
},
producer: {
clients: {
pub1: {
kafkaInstanceRef: 'sub1',
}
}
}
}
}

使用 Producer

Producer 不存在默认实例,由于使用了服务工厂的设计模式,所以可以通过 @InjectClient() 来注入。

// src/service/user.service.ts
import { Provide, InjectClient } from '@midwayjs/core';
import { KafkaProducerFactory, Producer } from '@midwayjs/kafka';

@Provide()
export class UserService {

@InjectClient(KafkaProducerFactory, 'pub1')
producer: Producer;

async invoke() {
await this.producer.send({
topic: 'topic-test-1',
messages: [{ key: 'message-key1', value: 'hello consumer 11 !' }],
});
}
}

Admin

Kafka 的 Admin 功能,可以用来创建、删除、查看主题,查看配置和 ACL 等。

基础配置

和 Producer 类似,Admin 也使用了服务工厂的设计模式。

// src/config/config.default
export default {
kafka: {
admin: {
clients: {
admin1: {
// ...
}
}
}
}
}

同样的,Admin 也可以复用 Kafka 实例。

// src/config/config.default
export default {
kafka: {
consumer: {
sub1: {
connectionOptions: {
clientId: 'my-app',
brokers: ['localhost:9092'],
},
}
},
admin: {
clients: {
admin1: {
kafkaInstanceRef: 'sub1',
}
}
}
}
}

使用 Admin

Admin 不存在默认实例,由于使用了服务工厂的设计模式,所以可以通过 @InjectClient() 来注入。

// src/service/admin.service.ts
import { Provide, InjectClient } from '@midwayjs/core';
import { KafkaAdminFactory, Admin } from '@midwayjs/kafka';

@Provide()
export class AdminService {

@InjectClient(KafkaAdminFactory, 'admin1')
admin: Admin;
}

更多的 Admin 使用方法,可以参考 KafkaJS Admin 文档。

组件日志

Kafka 组件默认使用 kafkaLogger 日志,默认会将 ctx.logger 记录在 midway-kafka.log

你可以通过配置修改。

// src/config/config.default
export default {
midwayLogger: {
clients: {
kafkaLogger: {
fileLogName: 'midway-kafka.log',
},
},
},
}

这个日志的输出格式,我们也可以单独配置。

export default {
kafka: {
// ...
contextLoggerFormat: info => {
const { jobId, from } = info.ctx;
return `${info.timestamp} ${info.LEVEL} ${info.pid} ${info.message}`;
},
}
}

获取 KafkaJS 模块

KafkaJS 模块,可以通过 @midwayjs/kafkaKafkaJS 字段来获取。

import { KafkaJS } from '@midwayjs/kafka';

const { ConfigResourceTypes } = KafkaJS;
// ...

关于分区的警告

如果你使用的是 KafkaJS 的 v2.0.0 版本,你可能会看到如下的警告:

2024-11-04 23:47:28.228 WARN 31729 KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option "createPartitioner: Partitioners.LegacyPartitioner". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable "KAFKAJS_NO_PARTITIONER_WARNING=1" { timestamp: '2024-11-04T15:47:28.228Z', logger: 'kafkajs' }

这个警告是由于 KafkaJS 的 v2.0.0 版本默认使用了新的分区器,如果接受新的分区器行为,但想要关闭这个警告消息,可以通过设置环境变量 KAFKAJS_NO_PARTITIONER_WARNING=1 来消除这个警告。

或者显示声明分区器。

// src/config/config.default
import { KafkaJS } from '@midwayjs/kafka';
const { Partitioners } = KafkaJS;

export default {
kafka: {
producer: {
clients: {
pub1: {
// ...
producerOptions: {
createPartitioner: Partitioners.DefaultPartitioner,
// ...
createPartitioner: Partitioners.LegacyPartitioner,
},
},
},
},
}
}

建议你查看 KafkaJS v2.0.0 的 迁移指南 了解更多细节。

参考文档