Kafka
In the architecture of complex systems, event streams are a crucial part, including capturing data in real-time from event sources (databases, sensors, mobile devices, etc.) as event streams, persisting event streams for easy retrieval, and processing and responding to event streams in real-time and retrospectively.
Applicable to industries such as payment and financial transactions, implementing tracking and monitoring of automotive information flow, capturing and analyzing IoT data, etc.
In Midway, we provide the ability to subscribe to Kafka to meet such user needs.
Related Information:
Subscription Service
Description | |
---|---|
Available for standard projects | ✅ |
Available for Serverless | ❌ |
Available for integrated projects | ✅ |
Includes standalone main framework | ✅ |
Includes standalone logging | ✅ |
Basic Concepts
Distributed stream processing platform
- Publish-subscribe (stream) information
- Fault-tolerant (failover) storage of information (streams), storing event streams
- Process event streams as they occur
Understanding Producer
- Publish messages to one or more topics.
Understanding Consumer
- Subscribe to one or more topics and process the generated information.
Understanding Stream API
- Acts as a stream processor, consuming input streams from one or more topics and producing an output stream to one or more output topics, effectively transforming input streams into output streams.
Understanding Broker
- Published messages are stored in a set of servers called a Kafka cluster. Each server in the cluster is a broker. Consumers can subscribe to one or more topics and pull data from brokers to consume these published messages.
From v3.19, the Kafka component has been refactored, and the configuration and usage methods of the Kafka component have changed significantly from before. The original usage method is compatible, but the documentation is no longer retained.
Install Dependencies
Install the @midwayjs/kafka
module.
$ npm i @midwayjs/kafka --save
Or add the following dependency to package.json
and reinstall.
{
"dependencies": {
"@midwayjs/kafka": "^3.0.0",
// ...
}
}
Enable Component
@midwayjs/kafka
can be used as a standalone main framework.
// src/configuration.ts
import { Configuration } from '@midwayjs/core';
import * as kafka from '@midwayjs/kafka';
@Configuration({
imports: [
kafka
],
// ...
})
export class MainConfiguration {
async onReady() {
// ...
}
}
It can also be attached to other main frameworks, such as @midwayjs/koa
.
// src/configuration.ts
import { Configuration } from '@midwayjs/core';
import * as koa from '@midwayjs/koa';
import * as kafka from '@midwayjs/kafka';
@Configuration({
imports: [
koa,
kafka
],
// ...
})
export class MainConfiguration {
async onReady() {
// ...
}
}
Since Kafka is divided into Consumer and Producer parts, both can be used independently, and we will introduce them separately.
Consumer
Directory Structure
We usually place consumers in the consumer directory, such as src/consumer/user.consumer.ts
.
➜ my_midway_app tree
.
├── src
│ ├── consumer
│ │ └── user.consumer.ts
│ ├── interface.ts
│ └── service
│ └── user.service.ts
├── test
├── package.json
└── tsconfig.json
Basic Configuration
We can configure multiple consumers through the consumer
field and the @KafkaConsumer
decorator.
For example, sub1
and sub2
below are two different consumers.
// src/config/config.default
export default {
kafka: {
consumer: {
sub1: {
// ...
},
sub2: {
// ...
},
}
}
}
The simplest consumer configuration requires several fields: Kafka connection configuration, consumer configuration, and subscription configuration.
// src/config/config.default
export default {
kafka: {
consumer: {
sub1: {
connectionOptions: {
// ...
},
consumerOptions: {
// ...
},
subscribeOptions: {
// ...
},
},
}
}
}
For example:
// src/config/config.default
export default {
kafka: {
consumer: {
sub1: {
connectionOptions: {
clientId: 'my-app',
brokers: ['localhost:9092'],
},
consumerOptions: {
groupId: 'groupId-test-1',
},
subscribeOptions: {
topics: ['topic-test-1'],
}
},
}
}
}
Complete configurable parameters include:
connectionOptions
: Kafka connection configuration, i.e., parameters fornew Kafka(consumerOptions)
consumerOptions
: Kafka consumer configuration, i.e., parameters forkafka.consumer(consumerOptions)
subscribeOptions
: Kafka subscription configuration, i.e., parameters forconsumer.subscribe(subscribeOptions)
consumerRunConfig
: Consumer run configuration, i.e., parameters forconsumer.run(consumerRunConfig)
For detailed explanations of these parameters, refer to the KafkaJS Consumer documentation.
Reuse Kafka Instance
If you need to reuse a Kafka instance, you can specify it through the kafkaInstanceRef
field.
// src/config/config.default
export default {
kafka: {
consumer: {
sub1: {
connectionOptions: {
clientId: 'my-app',
brokers: ['localhost:9092'],
},
consumerOptions: {
groupId: 'groupId-test-1',
},
subscribeOptions: {
topics: ['topic-test-1'],
}
},
sub2: {
kafkaInstanceRef: 'sub1',
consumerOptions: {
groupId: 'groupId-test-2',
},
subscribeOptions: {
topics: ['topic-test-2'],
}
}
}
}
}
Note that sub1
and sub2
above are two different consumers, but they share the same Kafka instance, and sub2
's groupId
needs to be different from sub1
.
The Kafka SDK writing is similar to the following:
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
});
const consumer1 = kafka.consumer({ groupId: 'groupId-test-1' });
const consumer2 = kafka.consumer({ groupId: 'groupId-test-2' });
Consumer Implementation
We can provide a standard consumer implementation in the directory, such as src/consumer/sub1.consumer.ts
.
// src/consumer/sub1.consumer.ts
import { KafkaConsumer, IKafkaConsumer, EachMessagePayload } from '@midwayjs/kafka';
@KafkaConsumer('sub1')
class Sub1Consumer implements IKafkaConsumer {
async eachMessage(payload: EachMessagePayload) {
// ...
}
}
sub1
is the consumer name, using the sub1
consumer in the configuration.
You can also implement the eachBatch
method to process batch messages.
// src/consumer/sub1.consumer.ts
import { KafkaConsumer, IKafkaConsumer, EachBatchPayload } from '@midwayjs/kafka';
@KafkaConsumer('sub1')
class Sub1Consumer implements IKafkaConsumer {
async eachBatch(payload: EachBatchPayload) {
// ...
}
}
Message Context
Like other message subscription mechanisms, the message itself is passed through the Context
field.
// src/consumer/sub1.consumer.ts
import { KafkaConsumer, IKafkaConsumer, EachMessagePayload, Context } from '@midwayjs/kafka';
import { Inject } from '@midwayjs/core';
@KafkaConsumer('sub1')
class Sub1Consumer implements IKafkaConsumer {
@Inject()
ctx: Context;
async eachMessage(payload: EachMessagePayload) {
// ...
}
}
The Context
field includes several properties:
Property | Type | Description |
---|---|---|
ctx.payload | EachMessagePayload, EachBatchPayload | Message content |
ctx.consumer | Consumer | Consumer instance |
You can call Kafka's API through ctx.consumer
, such as ctx.consumer.commitOffsets
to manually commit offsets or ctx.consumer.pause
to pause consumption.
Producer
Basic Configuration
Service producers also need to create instances, and the configuration itself uses the Service Factory design pattern.
The configuration is as follows:
// src/config/config.default
export default {
kafka: {
producer: {
clients: {
pub1: {
// ...
},
pub2: {
// ...
}
}
}
}
}
Each Producer instance's configuration also includes connectionOptions
and producerOptions
.
// src/config/config.default
export default {
kafka: {
producer: {
clients: {
pub1: {
connectionOptions: {
clientId: 'my-app',
brokers: ['localhost:9092'],
},
producerOptions: {
// ...
}
}
}
}
}
}
For specific parameters, refer to the KafkaJS Producer documentation.
Additionally, since Kafka Consumer and Producer can both be created from the same Kafka instance, they can reuse the same Kafka instance.
If the Producer is created after the Consumer, it can also reuse the Kafka instance using the kafkaInstanceRef
field.
// src/config/config.default
export default {
kafka: {
consumer: {
sub1: {
connectionOptions: {
clientId: 'my-app',
brokers: ['localhost:9092'],
},
}
},
producer: {
clients: {
pub1: {
kafkaInstanceRef: 'sub1',
}
}
}
}
}
Using Producer
There is no default instance for Producer. Since the service factory design pattern is used, it can be injected through @InjectClient()
.
// src/service/user.service.ts
import { Provide, InjectClient } from '@midwayjs/core';
import { KafkaProducerFactory, Producer } from '@midwayjs/kafka';
@Provide()
export class UserService {
@InjectClient(KafkaProducerFactory, 'pub1')
producer: Producer;
async invoke() {
await this.producer.send({
topic: 'topic-test-1',
messages: [{ key: 'message-key1', value: 'hello consumer 11 !' }],
});
}
}
Admin
Kafka's Admin functionality can be used to create, delete, view topics, view configurations, and ACLs, etc.
Basic Configuration
Like Producer, Admin also uses the service factory design pattern.
// src/config/config.default
export default {
kafka: {
admin: {
clients: {
admin1: {
// ...
}
}
}
}
}
Similarly, Admin can also reuse the Kafka instance.
// src/config/config.default
export default {
kafka: {
consumer: {
sub1: {
connectionOptions: {
clientId: 'my-app',
brokers: ['localhost:9092'],
},
}
},
admin: {
clients: {
admin1: {
kafkaInstanceRef: 'sub1',
}
}
}
}
}
Using Admin
There is no default instance for Admin. Since the service factory design pattern is used, it can be injected through @InjectClient()
.
// src/service/admin.service.ts
import { Provide, InjectClient } from '@midwayjs/core';
import { KafkaAdminFactory, Admin } from '@midwayjs/kafka';
@Provide()
export class AdminService {
@InjectClient(KafkaAdminFactory, 'admin1')
admin: Admin;
}
For more Admin usage methods, refer to the KafkaJS Admin documentation.
Component Logging
The Kafka component uses the kafkaLogger
log by default, which will record ctx.logger
in midway-kafka.log
.
You can modify it through configuration.
// src/config/config.default
export default {
midwayLogger: {
clients: {
kafkaLogger: {
fileLogName: 'midway-kafka.log',
},
},
},
}
The output format of this log can also be configured separately.
export default {
kafka: {
// ...
contextLoggerFormat: info => {
const { jobId, from } = info.ctx;
return `${info.timestamp} ${info.LEVEL} ${info.pid} ${info.message}`;
},
}
}
Access KafkaJS Module
The KafkaJS module can be accessed through the KafkaJS
field of @midwayjs/kafka
.
import { KafkaJS } from '@midwayjs/kafka';
const { ConfigResourceTypes } = KafkaJS;
// ...
Warning About Partitions
If you are using KafkaJS version v2.0.0, you may see the following warning:
2024-11-04 23:47:28.228 WARN 31729 KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option "createPartitioner: Partitioners.LegacyPartitioner". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable "KAFKAJS_NO_PARTITIONER_WARNING=1" { timestamp: '2024-11-04T15:47:28.228Z', logger: 'kafkajs' }
This warning is due to KafkaJS version v2.0.0 using a new partitioner by default. If you accept the new partitioner behavior but want to turn off this warning message, you can eliminate this warning by setting the environment variable KAFKAJS_NO_PARTITIONER_WARNING=1
.
Or explicitly declare the partitioner.
// src/config/config.default
import { KafkaJS } from '@midwayjs/kafka';
const { Partitioners } = KafkaJS;
export default {
kafka: {
producer: {
clients: {
pub1: {
// ...
producerOptions: {
createPartitioner: Partitioners.DefaultPartitioner,
// ...
createPartitioner: Partitioners.LegacyPartitioner,
},
},
},
},
}
}
It is recommended to check the KafkaJS v2.0.0 migration guide for more details.