Skip to main content
Version: 3.0.0

gRPC

gRPC 是一个高性能、通用的开源 RPC 框架,其由 Google 主要面向移动应用开发并基于 HTTP/2 协议标准而设计,基于 ProtoBuf(Protocol Buffers) 序列化协议开发,且支持众多开发语言。

本篇内容演示了如何在 Midway 体系下,提供 gRPC 服务,以及调用 gRPC 服务的方法。

Midway 当前采用了最新的 gRPC 官方推荐的 @grpc/grpc-js 进行开发,并提供了一些工具包,用于快速发布服务和调用服务。

我们使用的模块为 @midwayjs/grpc ,既可以独立发布服务,又可以接入其它框架调用 gRPC 服务。

相关信息:

提供服务

描述
可用于标准项目
可用于 Serverless
可用于一体化

调用服务

描述
可用于标准项目
可用于 Serverless
可用于一体化

其他

描述
可作为主框架独立使用
可独立添加中间件

安装依赖

$ npm i @midwayjs/grpc@3 --save
$ npm i @midwayjs/grpc-helper --save-dev

或者在 package.json 中增加如下依赖后,重新安装。

{
"dependencies": {
"@midwayjs/grpc": "^3.0.0",
// ...
},
"devDependencies": {
"@midwayjs/grpc-helper": "^1.0.0",
// ...
}
}

开启组件

tip

不管是提供服务还是调用服务,都需要开启组件。

@midwayjs/grpc 可以作为独立主框架使用。

// src/configuration.ts
import { Configuration } from '@midwayjs/decorator';
import * as grpc from '@midwayjs/grpc';

@Configuration({
imports: [grpc],
// ...
})
export class MainConfiguration {
async onReady() {
// ...
}
}

也可以附加在其他的主框架下,比如 @midwayjs/koa

// src/configuration.ts
import { Configuration } from '@midwayjs/decorator';
import * as koa from '@midwayjs/koa';
import * as grpc from '@midwayjs/grpc';

@Configuration({
imports: [koa, grpc],
// ...
})
export class ContainerLifeCycle {
async onReady() {
// ...
}
}

目录结构

大致的目录结构如下,src/provider 是提供 gRPC 服务的目录。

.
├── package.json
├── proto ## proto 定义文件
│ └── helloworld.proto
├── src
│ ├── configuration.ts ## 入口配置文件
│ ├── interface.ts
│ └── provider ## gRPC 提供服务的文件
│ └── greeter.ts
├── test
├── bootstrap.js ## 服务启动入口
└── tsconfig.json

定义服务接口

在微服务中,定义一个服务需要特定的接口定义语言(IDL)来完成,在 gRPC中 默认使用 Protocol Buffers 作为序列化协议。

序列化协议独立于语言和平台,提供了多种语言的实现,Java,C++,Go 等等,每一种实现都包含了相应语言的编译器和库文件。所以 gRPC 是一个提供和调用都可以跨语言的服务框架。

一个gRPC服务的大体架构可以用官网上的一幅图表示。

Protocol Buffers 协议的文件,默认的后缀为 .proto 。.proto后缀的IDL文件,并通过其编译器生成特定语言的数据结构、服务端接口和客户端Stub代码。

info

由于 proto 文件可以跨语言使用,为了方便共享,我们一般将 proto 文件放在 src 目录外侧,方便其他工具复制分发。

下面是一个基础的 proto/helloworld.proto 文件。

syntax = "proto3";

package helloworld;

service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
string name = 1;
}

message HelloReply {
string message = 1;
}

proto3 表示的是第三版的 protobuf 协议,是 gRPC 目前推荐的版本,“语法简单,功能更全”。

我们可以用 service 格式,定义服务体,其中可以包含方法。同时,我们可以更加细致的通过 message 描述服务具体的请求参数和响应参数。

我们可以从 Google 的官网文档 中查看更多细节。

info

大家会看到,这和 Java 中的 Class 非常相像,每个结构就相当于 Java 中的一个类。

编写 proto 文件

现在我们再来看之前的服务,是不是就很好理解了。

syntax = "proto3";

package helloworld;

// 服务的定义
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// 服务的请求参数
message HelloRequest {
string name = 1;
}

// 服务的响应参数
message HelloReply {
string message = 1;
}

我们定义了一个名为 Greeter 的服务,包含一个 HelloRequest 结构的请求体,以及返回 HelloReply 结构的响应体。

接下去,我们将对这个服务给大家做演示。

生成代码定义

传统的 gRPC 框架,需要用户手动编写 proto 文件,以及生成 js 服务,最后再根据 js 生成的服务再编写实现,在 Midway 体系下,我们提供了一个 grpc-helper 工具包来加速这个过程。

如果没有安装,可以先安装。

$ npm i @midwayjs/grpc-helper --save-dev

grpc-helper 工具的作用,是将用户提供的 proto 文件,生成对应可读的 ts interface 文件。

我们可以添加一个脚本,方便这个过程。

{
"scripts": {
"generate": "tsproto --path proto --output src/domain"
}
}

然后执行 npm run generate

上述命令执行后,会在代码的 src/domain 目录中生成 proto 文件对应的服务接口定义。

info

不管是提供 gRPC 服务还是调用 gRPC 服务,都要先生成定义。

生成的代码如下,包含有一个命名空间(namespace),以及命名空间下的两个 TypeScript Interface, Greeter 用于编写服务端实现, GreeterClient 用于编写客户端实现。

/**
* This file is auto-generated by grpc-helper
*/

import * as grpc from '@midwayjs/grpc';

// 生成的命名空间
export namespace helloworld {

// 服务端使用的定义
export interface Greeter {
// Sends a greeting
sayHello(data: HelloRequest): Promise<HelloReply>;
}

// 客户端使用的定义
export interface GreeterClient {
// Sends a greeting
sayHello(options?: grpc.IClientOptions): grpc.IClientUnaryService<HelloRequest, HelloReply>;
}

// 请求体结构
export interface HelloRequest {
name?: string;
}

// 响应体结构
export interface HelloReply {
message?: string;
}
}

info

每当 proto 文件被修改时,就需要重新生成对应的服务定义,然后将对应的方法实现。

提供 gRPC 服务(Provider)

编写服务提供方(Provider)

src/provider 目录中,我们创建 greeter.ts ,内容如下。

import {
MSProviderType,
Provider,
Provide,
GrpcMethod,
} from '@midwayjs/decorator';
import { helloworld } from '../domain/helloworld';

/**
* 实现 helloworld.Greeter 接口的服务
*/
@Provider(MSProviderType.GRPC, { package: 'helloworld' })
export class Greeter implements helloworld.Greeter {

@GrpcMethod()
async sayHello(request: helloworld.HelloRequest) {
return { message: 'Hello ' + request.name };
}
}

info

注意,@Provider 装饰器和 @Provide 装饰器不同,前者用于提供服务,后者用于依赖注入容器扫描标识的类。

我们使用 @Provider 暴露出一个 RPC 服务, @Provider 的第一个参数为 RPC 服务类型,这个参数是个枚举,这里选择 GRPC 类型。

@Provider 的第二个参数为 RPC 服务的元数据,这里指代的是 gRPC 服务的元数据。这里需要写入 gRPC 的 package 字段,即 proto 文件中的 package 字段(这里的字段用于和 proto 文件加载后的字段做对应)。

对于普通的 gRPC 服务接口(UnaryCall),我们只需要使用 @GrpcMethod() 装饰器修饰即可。修饰的方法即为服务定义本身,入参为 proto 中定义好的入参,return 值即为定义好的响应体。

info

注意,生成的 Interface 是为了更好的编写服务代码,规范结构,请务必按照定义编写。

配置服务

这里启动需要用到项目根目录 bootstrap.js 独立文件。代码和其他框架初始化类似,只是这里的框架包是 @midwayjs/grpc

内容如下(函数形式的配置):

// src/config/config.default
import { MidwayAppInfo, MidwayConfig } from '@midwayjs/core';

export default (appInfo: MidwayAppInfo): MidwayConfig => {
return {
// ...
grpcServer: {
services: [
{
protoPath: join(appInfo.appDir, 'proto/hero.proto'),
package: 'hero',
},
{
protoPath: join(appInfo.appDir, 'proto/helloworld.proto'),
package: 'helloworld',
}
],
}
};
}

services 字段是数组,意味着 Midway 项目可以同时发布多个 gRPC 服务。每个 service 的结构为:

属性类型描述
protoPathstring必选,proto 文件的绝对路径
packagestring必选,服务对应的 package

除了 Service 配置之外,还有一些其他的配置。

属性类型描述
loaderOptionsObject可选,proto file loader 的 options
credentialsServerCredentials可选,grpc Server binding 时的 credentials 参数选项
serverOptionsChannelOptions可选,grpc Server 的 自定义 options

提供安全证书

可以通过 credentials 参数传递安全证书。

// src/config/config.default
import { MidwayAppInfo, MidwayConfig } from '@midwayjs/core';
import { ServerCredentials } from '@midwayjs/grpc';
import { readFileSync } from 'fs';
import { join } from 'path';

const cert = readFileSync(join(__dirname, './cert/server.crt'));
const pem = readFileSync(join(__dirname, './cert/server.pem'));
const key = readFileSync(join(__dirname, './cert/server.key'));

export default (appInfo: MidwayAppInfo): MidwayConfig => {
return {
// ...
grpcServer: {
// ...
credentials: ServerCredentials.createSsl(cert, [{ private_key: key, cert_chain: pem }]);
}
};
}

编写单元测试

@midwayjs/grpc 库提供了一个 createGRPCConsumer 方法,用于实时调用客户端,一般我们用这个方法做测试。

caution

这个方法每次调用会实时连接,不建议将该方法用在生产环境。

在测试中写法如下。

import { createApp, close } from '@midwayjs/mock';
import { Framework, createGRPCConsumer } from '@midwayjs/grpc';
import { join } from 'path';
import { helloworld } from '../src/domain/helloworld';

describe('test/index.test.ts', () => {

it('should create multiple grpc service in one server', async () => {
const baseDir = join(__dirname, '../');

// 创建服务
const app = await createApp<Framework>();

// 调用服务
const service = await createGRPCConsumer<helloworld.GreeterClient>({
package: 'helloworld',
protoPath: join(baseDir, 'proto', 'helloworld.proto'),
url: 'localhost:6565'
});

const result = await service.sayHello().sendMessage({
name: 'harry'
});

expect(result.message).toEqual('Hello harry');
await close(app);
});

});

调用 gRPC 服务(Consumer)

我们编写一个 gRPC 服务来调用上面的暴露的服务。

info

事实上,你可以在 Web 的 Controller,或者 Service 等其他地方来调用,这里只是做一个示例。

调用配置

你需要在 src/config/config.default.ts 中增加你需要调用的目标服务以及它的 proto 文件信息。

比如,这里我们填写了上面暴露的服务本身,以及该服务的 proto,包名等信息(函数形式)。

// src/config/config.default
import { MidwayAppInfo, MidwayConfig } from '@midwayjs/core';

export default (appInfo: MidwayAppInfo): MidwayConfig => {
return {
// ...
grpc: {
services: [
{
url: 'localhost:6565',
protoPath: join(appInfo.appDir, 'proto/helloworld.proto'),
package: 'helloworld',
},
],
},
};
}

代码调用

配置完后,我们就可以在代码里调用了。

@midwayjs/grpc 提供了 clients ,可以方便的获取到已配置的服务。我们只需要在需要注入的地方,注入这个对象即可。

比如:

import {
Provide,
Inject,
} from '@midwayjs/decorator';
import { helloworld, hero } from '../interface';
import { Clients } from '@midwayjs/grpc';

@Provide()
export class UserService {
@Inject()
grpcClients: Clients;

}

我们通过 clients 获取到对方服务的客户端实例,然后调用即可。

import {
Provide,
Inject,
} from '@midwayjs/decorator';
import { helloworld, hero } from '../interface';
import { Clients } from '@midwayjs/grpc';

@Provide()
export class UserService {
@Inject()
grpcClients: Clients;

async invoke() {
// 获取服务
const greeterService = this.grpcClients.getService<helloworld.GreeterClient>(
'helloworld.Greeter'
);

// 调用服务
const result = await greeterService.sayHello()
.sendMessage({
name: 'harry'
});

// 返回结果
return result;
}

}

我们也可以利用 @Init 装饰器,将需要调用的服务缓存到属性上。这样可以在其他方法调用时复用。

示例如下。

import {
GrpcMethod,
MSProviderType,
Provider,
Inject,
Init,
} from '@midwayjs/decorator';
import { helloworld, hero } from '../interface';
import { Clients } from '@midwayjs/grpc';

@Provider(MSProviderType.GRPC, { package: 'hero' })
export class HeroService implements hero.HeroService {
// 注入客户端
@Inject()
grpcClients: Clients;

greeterService: helloworld.GreeterClient;

@Init()
async init() {
// 赋值一个服务实例
this.greeterService = this.grpcClients.getService<helloworld.GreeterClient>(
'helloworld.Greeter'
);
}

@GrpcMethod()
async findOne(data) {
// 调用服务
const result = await greeterService.sayHello()
.sendMessage({
name: 'harry'
});

// 返回结果
return result;
}
}

流式服务

gRPC 的流式服务用于减少连接,让服务端或者客户端不需要等待即可执行任务,从而提高执行效率。

gRPC 的流式服务分为三种,以服务端角度来说,为

  • 服务端接收流(客户端推)
  • 服务端响应流(服务端推)
  • 双向流

下面我们将一一介绍。

流式 proto 文件

流式的 proto 文件写法不同,需要在希望使用流式的地方将参数标记为 stream


syntax = "proto3";

package math;

message AddArgs {
int32 id = 1;
int32 num = 2;
}

message Num {
int32 id = 1;
int32 num = 2;
}

service Math {
rpc Add (AddArgs) returns (Num) {
}

// 双向流
rpc AddMore (stream AddArgs) returns (stream Num) {
}

// 服务端往客户端推
rpc SumMany (AddArgs) returns (stream Num) {
}

// 客户端往服务端推
rpc AddMany (stream AddArgs) returns (Num) {
}
}

该服务生成的接口定义为:

import {
IClientDuplexStreamService,
IClientReadableStreamService,
IClientUnaryService,
IClientWritableStreamService,
IClientOptions,
} from '@midwayjs/grpc';

export namespace math {
export interface AddArgs {
id?: number;
num?: number;
}
export interface Num {
id?: number;
num?: number;
}

/**
* server interface
*/
export interface Math {
add(data: AddArgs): Promise<Num>;
addMore(data: AddArgs): Promise<void>;
// 服务端推,客户端读
sumMany(data: AddArgs): Promise<void>
// 客户端端推,服务端读
addMany(num: AddArgs): Promise<void>;
}

/**
* client interface
*/
export interface MathClient {
add(options?: IClientOptions): IClientUnaryService<AddArgs, Num>;
addMore(options?: IClientOptions): IClientDuplexStreamService<AddArgs, Num>;
// 服务端推,客户端读
sumMany(options?: IClientOptions): IClientReadableStreamService<AddArgs, Num>;
// 客户端端推,服务端读
addMany(options?: IClientOptions): IClientWritableStreamService<AddArgs, Num>;
}
}

服务端推送

客户端调用一次,服务端可以多次返回。通过 @GrpcMethod() 的参数来标识流式类型。

可用的类型为:

  • GrpcStreamTypeEnum.WRITEABLE 服务端输出流(单工)
  • GrpcStreamTypeEnum.READABLE 客户端输出流(单工),服务端接受多次
  • GrpcStreamTypeEnum.DUPLEX 双工流

服务端示例如下:

import { GrpcMethod, GrpcStreamTypeEnum, Inject, MSProviderType, Provider } from '@midwayjs/decorator';
import { Context, Metadata } from '@midwayjs/grpc';
import { math } from '../interface';

/**
*/
@Provider(MSProviderType.GRPC, { package: 'math' })
export class Math implements math.Math {

@Inject()
ctx: Context;

@GrpcMethod({type: GrpcStreamTypeEnum.WRITEABLE })
async sumMany(args: math.AddArgs) {
this.ctx.write({
num: 1 + args.num
});
this.ctx.write({
num: 2 + args.num
});
this.ctx.write({
num: 3 + args.num
});

this.ctx.end();
}

// ...
}

服务端使用 ctx.write 方法来返回数据,由于是服务端流,可以返回多次。

返回结束后,请使用 ctx.end() 方法关闭流。

客户端,调用一次,接受多次数据。

比如下面的累加逻辑。

Promise 写法,会等待服务端数据都返回再做处理。

// 服务端推送
let total = 0;
let result = await service.sumMany().sendMessage({
num: 1,
});

result.forEach(data => {
total += data.num;
});

// total = 9;

事件写法,实时处理。

// 服务端推送
let call = service.sumMany().getCall();

call.on('data', data => {
// do something
});

call.sendMessage({
num: 1,
});

客户端推送

客户端调用多次,服务端接收多次数据,返回一个结果。通过 @GrpcMethod({type: GrpcStreamTypeEnum.READABLE}) 的参数来标识流式类型。

服务端示例如下:

import { GrpcMethod, GrpcStreamTypeEnum, Inject, MSProviderType, Provider } from '@midwayjs/decorator';
import { Context, Metadata } from '@midwayjs/grpc';
import { math } from '../interface';

/**
*/
@Provider(MSProviderType.GRPC, { package: 'math' })
export class Math implements math.Math {

sumDataList: number[] = [];

@Inject()
ctx: Context;

@GrpcMethod({type: GrpcStreamTypeEnum.READABLE, onEnd: 'sumEnd' })
async addMany(data: math.Num) {
this.sumDataList.push(data);
}

async sumEnd(): Promise<math.Num> {
const total = this.sumDataList.reduce((pre, cur) => {
return {
num: pre.num + cur.num,
}
});
return total;
}

// ...
}

客户端每次调用,都会触发一次 addMany 方法。

在客户端发送 end 事件之后,会调用 @GrpcMethod 装饰器上的 onEnd 参数指定的方法,该方法的返回值即为最后客户端拿到的值。

客户端示例如下:

// 客户端推送
const data = await service.addMany()
.sendMessage({num: 1})
.sendMessage({num: 2})
.sendMessage({num: 3})
.end();

// data.num = 6

双向流

客户端可以调用多次,服务端也可以接收多次数据,返回多个结果,类似于传统的 TCP 通信。通过 @GrpcMethod({type: GrpcStreamTypeEnum.DUPLEX}) 的参数来标识双工流式类型。

服务端示例如下:

import { GrpcMethod, GrpcStreamTypeEnum, Inject, MSProviderType, Provider } from '@midwayjs/decorator';
import { Context, Metadata } from '@midwayjs/grpc';
import { math } from '../interface';

/**
*/
@Provider(MSProviderType.GRPC, { package: 'math' })
export class Math implements math.Math {

@Inject()
ctx: Context;

@GrpcMethod({type: GrpcStreamTypeEnum.DUPLEX, onEnd: 'duplexEnd' })
async addMore(message: math.AddArgs) {
this.ctx.write({
id: message.id,
num: message.num + 10,
});
}

async duplexEnd() {
console.log('got client end message');
}
// ...
}

服务端可以随时使用 ctx.write 返回数据,也可以使用 ctx.end 来关闭流。

客户端示例:

对于双工通信的客户端,由于无法保证调用、返回的顺序,我们需要使用监听的模式来消费结果。

const clientStream = service.addMore().getCall();

let total = 0;
let idx = 0;

duplexCall.on('data', (data: math.Num) => {
total += data.num;
idx++;
if (idx === 2) {
duplexCall.end();
// total => 29
}
});

duplexCall.write({
num: 3,
});

duplexCall.write({
num: 6,
});

如果希望保证调用顺序,我们也提供了保证顺序的双向流调用方法,但是需要在 proto 中定义一个固定的 id,来确保顺序。

比如我们的 Math.proto,对每个入参和出参,都增加了一个固定的 id,所以可以固定顺序。


syntax = "proto3";

package math;

message AddArgs {
int32 id = 1; // 这里的 id 名字是固定的
int32 num = 2;
}

message Num {
int32 id = 1; // 这里的 id 名字是固定的
int32 num = 2;
}

service Math {
rpc Add (AddArgs) returns (Num) {
}

rpc AddMore (stream AddArgs) returns (stream Num) {
}

// 服务端往客户端推
rpc SumMany (AddArgs) returns (stream Num) {
}

// 客户端往服务端推
rpc AddMany (stream AddArgs) returns (Num) {
}
}

固定顺序的客户端调用方式如下:

// 保证顺序的双向流
const t = service.addMore();

const result4 = await new Promise<number>((resolve, reject) => {

let total = 0;

// 第一次调用和返回
t.sendMessage({
num: 2
})
.then(res => {
expect(res.num).toEqual(12);
total += res.num;
})
.catch(err => console.error(err));

// 第二次调用和返回
t.sendMessage({
num: 5
}).then(res => {
expect(res.num).toEqual(15);
total += res.num;
resolve(total);
})
.catch(err => console.error(err));

t.end();
});

// result4 => 27

默认的 id 为 id ,如果服务端定义不同,需要修改,可以在客户端调用时传递。

// 保证顺序的双向流
const t = service.addMore({
messageKey: 'uid'
});

元数据(Metadata)

gRPC 的元数据等价于 HTTP 的上下文。

服务端通过 ctx.sendMetadata 方法返回元数据,也可以通过 ctx.metadata 获取客户端传递的元数据。

import {
MSProviderType,
Provider,
GrpcMethod,
} from '@midwayjs/decorator';
import { helloworld } from '../domain/helloworld';
import { Context, Metadata } from '@midwayjs/grpc';

/**
* 实现 helloworld.Greeter 接口的服务
*/
@Provider(MSProviderType.GRPC, { package: 'helloworld' })
export class Greeter implements helloworld.Greeter {

@Inject()
ctx: Context;

@GrpcMethod()
async sayHello(request: helloworld.HelloRequest) {

// 客户端传递的元数据
console.log(this.ctx.metadata);

// 创建元数据
const meta = new Metadata();
this.ctx.metadata.add('xxx', 'bbb');
this.ctx.sendMetadata(meta);

return { message: 'Hello ' + request.name };
}
}

客户端通过方法的 options 参数传递元数据。

import { Metadata } from '@midwayjs/grpc';

const meta = new Metadata();
meta.add('key', 'value');

const result = await service.sayHello({
metadata: meta,
}).sendMessage({
name: 'harry'
});

获取元数据相对麻烦一些。

普通一元调用(UnaryCall)获取元数据需要使用 sendMessageWithCallback 方法。

const call = service.sayHello().sendMessageWithCallback({
name: 'zhangting'
}, (err) => {
if (err) {
reject(err);
}
});
call.on('metadata', (meta) => {
// output meta
});

其他流式服务,可以通过 getCall() 方法获取原始客户端流对象,从而直接订阅。

// 获取服务,注意,这里没有 await
const call = service.addMany().getCall();
call.on('metadata', (meta) => {
// output meta
});

超时处理

我们可以在调用服务时传递参数,单位毫秒。

const result = await service.sayHello({
timeout: 5000
}).sendMessage({
name: 'harry'
});