Skip to main content

gRPC

gRPC 是一个高性能、通用的开源 RPC 框架,其由 Google 主要面向移动应用开发并基于 HTTP/2 协议标准而设计,基于 ProtoBuf(Protocol Buffers) 序列化协议开发,且支持众多开发语言。 ​

本篇内容演示了如何在 Midway 体系下,提供 gRPC 服务,以及调用 gRPC 服务的方法。

Midway 当前采用了最新的 gRPC 官方推荐的 @grpc/grpc-js 进行开发,并提供了一些工具包,用于快速发布服务和调用服务。

我们使用的模块为 @midwayjs/grpc ,既是一个框架(可以独立发布服务),又是一个组件(可以接入其它框架调用 gRPC 服务)。 ​

创建示例#

$ npm -v
# 如果是 npm v6$ npm init midway --type=grpc my_midway_app
# 如果是 npm v7$ npm init midway -- --type=grpc my_midway_app

此示例包含一个 gRPC 服务。

目录结构#

.├── package.json├── proto                                                   ## proto 定义文件│   └── helloworld.proto├── src│   ├── configuration.ts                    ## 入口配置文件│   ├── interface.ts│   └── provider                                    ## gRPC 提供服务的文件│       └── greeter.ts├── test├── bootstrap.js                                    ## 服务启动入口└── tsconfig.json

定义服务接口#

在微服务中,定义一个服务需要特定的接口定义语言(IDL)来完成,在 gRPC 中 默认使用 Protocol Buffers 作为序列化协议。

序列化协议独立于语言和平台,提供了多种语言的实现,Java,C++,Go 等等,每一种实现都包含了相应语言的编译器和库文件。所以 gRPC 是一个提供和调用都可以跨语言的服务框架。

一个 gRPC 服务的大体架构可以用官网上的一幅图表示。

Protocol Buffers 协议的文件,默认的后缀为 .proto 。.proto 后缀的 IDL 文件,并通过其编译器生成特定语言的数据结构、服务端接口和客户端 Stub 代码。

info

由于 proto 文件可以跨语言使用,为了方便共享,我们一般将 proto 文件放在 src 目录外侧,方便其他工具复制分发。

下面是一个基础的 proto/helloworld.proto  文件。

syntax = "proto3";
package helloworld;
service Greeter {  // Sends a greeting  rpc SayHello (HelloRequest) returns (HelloReply) {}}
message HelloRequest {  string name = 1;}
message HelloReply {  string message = 1;}

proto3 表示的是第三版的 protobuf 协议,是 gRPC 目前推荐的版本,“语法简单,功能更全”。 ​

我们可以用 service  格式,定义服务体,其中可以包含方法。同时,我们可以更加细致的通过 message  描述服务具体的请求参数和响应参数。

我们可以从 Google 的官网文档 中查看更多细节。

info

大家会看到,这和 Java 中的 Class 非常相像,每个结构就相当于 Java 中的一个类。

编写 proto 文件#

现在我们再来看之前的服务,是不是就很好理解了。

syntax = "proto3";
package helloworld;
// 服务的定义service Greeter {  // Sends a greeting  rpc SayHello (HelloRequest) returns (HelloReply) {}}
// 服务的请求参数message HelloRequest {  string name = 1;}
// 服务的响应参数message HelloReply {  string message = 1;}

我们定义了一个名为 Greeter  的服务,包含一个 HelloRequest  结构的请求体,以及返回 HelloReply  结构的响应体。

接下去,我们将对这个服务给大家做演示。 ​

生成代码定义#

传统的 gRPC 框架,需要用户手动编写 proto 文件,以及生成 js 服务,最后再根据 js 生成的服务再编写实现,在 Midway 体系下,我们提供了一个 grpc-helper 工具包来加速这个过程。

如果没有安装,可以先安装(脚手架示例中已带)。

$ npm i @midwayjs/grpc-helper --save-dev

grpc-helper 工具的作用,是将用户提供的 proto 文件,生成对应可读的 ts interface 文件。 ​

我们可以添加一个脚本,方便这个过程。

{  "scripts": {    "generate": "tsproto --path proto --output src/domain"  }}

然后执行 npm run generate 。

上述命令执行后,会在代码的 src/domain  目录中生成 proto 文件对应的服务接口定义。

info

不管是提供 gRPC 服务还是调用 gRPC 服务,都要先生成定义。

生成的代码如下,包含有一个命名空间(namespace),以及命名空间下的两个 TypeScript Interface, Greeter  用于编写服务端实现, GreeterClient  用于编写客户端实现。

/** * This file is auto-generated by grpc-helper */
import * as grpc from '@midwayjs/grpc';
// 生成的命名空间export namespace helloworld {  // 服务端使用的定义  export interface Greeter {    // Sends a greeting    sayHello(data: HelloRequest): Promise<HelloReply>;  }
  // 客户端使用的定义  export interface GreeterClient {    // Sends a greeting    sayHello(options?: grpc.IClientOptions): grpc.IClientUnaryService<HelloRequest, HelloReply>;  }
  // 请求体结构  export interface HelloRequest {    name?: string;  }
  // 响应体结构  export interface HelloReply {    message?: string;  }}
info

每当 proto 文件被修改时,就需要重新生成对应的服务定义,然后将对应的方法实现。

提供 gRPC 服务(Provider)#

编写生产者(Provider)#

src/provider  目录中,我们创建 greeter.ts ,内容如下。

import { MSProviderType, Provider, Provide, GrpcMethod } from '@midwayjs/decorator';import { helloworld } from '../domain/helloworld';
/** * 实现 helloworld.Greeter 接口的服务 */@Provide()@Provider(MSProviderType.GRPC, { package: 'helloworld' })export class Greeter implements helloworld.Greeter {  @GrpcMethod()  async sayHello(request: helloworld.HelloRequest) {    return { message: 'Hello ' + request.name };  }}
info

注意,@Provider 装饰器和 @Provide 装饰器不同,前者用于提供服务,后者用于依赖注入容器扫描标识的类。

我们使用 @Provider  暴露出一个 RPC 服务, @Provider  的第一个参数为 RPC 服务类型,这个参数是个枚举,这里选择 GRPC 类型。

@Provider 的第二个参数为 RPC 服务的元数据,这里指代的是 gRPC 服务的元数据。这里需要写入 gRPC 的 package 字段,即 proto 文件中的 package 字段(这里的字段用于和 proto 文件加载后的字段做对应)。

对于普通的 gRPC 服务接口(UnaryCall),我们只需要使用 @GrpcMethod()  装饰器修饰即可。修饰的方法即为服务定义本身,入参为 proto 中定义好的入参,return 值即为定义好的响应体。

info

注意,生成的 Interface 是为了更好的编写服务代码,规范结构,请务必按照定义编写。

启动 gRPC 服务#

这里启动需要用到项目根目录 bootstrap.js  独立文件。代码和其他框架初始化类似,只是这里的框架包是 @midwayjs/grpc 。

内容如下:

// 获取框架const { Framework } = require('@midwayjs/grpc');const { join } = require('path');
// 初始化框架const grpcService = new Framework().configure({  services: [    {      protoPath: join(__dirname, 'proto/helloworld.proto'),      package: 'helloworld',    },  ],});
// 使用 bootstrap 启动const { Bootstrap } = require('@midwayjs/bootstrap');Bootstrap.load(grpcService).run();

我们已经将启动命令写到了 start 脚本中,执行 npm run start  即可。

"scripts": {  "start": "NODE_ENV=production node ./bootstrap.js",},
info

在部署前,需要执行 npm run build 将 ts 代码编译为 js。

框架选项#

@midwayjs/grpc  作为框架启动时,可以传递的参数如下:

urlstring可选,gRPC 服务连接字符串,默认为 localhost:6565
servicesIGRPCServiceOptions[]必选,数组,需要暴露的 gRPC 服务信息,每个服务对应一个 proto 文件
loaderOptionsobject可选,使用  @grpc/proto-loader 加载的选项,具体参考这里,默认为

{   keepCase: true,  longs: String,  enums: String,  defaults: true,  oneofs: true, } | | credentials | ServerCredentials | 可选,服务凭证,值参考这里,默认值为 ServerCredentials.createInsecure() |

services 字段是数组,意味着 Midway 项目可以同时发布多个 gRPC 服务。每个 service 的结构为: ​

protoPathstring必选,proto 文件的绝对路径
packagestring必选,服务对应的 package

编写单元测试#

@midwayjs/grpc  库提供了一个 createGRPCConsumer  方法,用于实时调用客户端,一般我们用这个方法做测试。

caution

这个方法每次调用会实时连接,不建议将该方法用在生产环境。

在测试中写法如下。

import { createApp, close } from '@midwayjs/mock';import { Framework, createGRPCConsumer } from '@midwayjs/grpc';import { join } from 'path';import { helloworld } from '../src/domain/helloworld';
describe('test/index.test.ts', () => {  it('should create multiple grpc service in one server', async () => {    const baseDir = join(__dirname, '../');
    // 创建服务    const app = await createApp<Framework>(baseDir, {      services: [        {          protoPath: join(baseDir, 'proto', 'helloworld.proto'),          package: 'helloworld',        },      ],    });
    // 调用服务    const service = await createGRPCConsumer<helloworld.GreeterClient>({      package: 'helloworld',      protoPath: join(baseDir, 'proto', 'helloworld.proto'),      url: 'localhost:6565',    });
    const result = await service.sayHello().sendMessage({      name: 'harry',    });
    expect(result.message).toEqual('Hello harry');    await close(app);  });});

调用 gRPC 服务(Consumer)#

我们编写一个 gRPC 服务来调用上面的暴露的服务。

info

事实上,你可以在 Web 的 Controller,或者 Service 等其他地方来调用,这里只是做一个示例。

增加组件#

@midwayjs/grpc  库即是 Framework,又是组件,在作为组件引入时,需要在 src/configuration.ts  中配置。

// src/configuration.ts
import { Configuration } from '@midwayjs/decorator';import * as grpc from '@midwayjs/grpc';import { join } from 'path';
@Configuration({  imports: [grpc],  importConfigs: [join(__dirname, './config')],})export class AutoConfiguration {}

提供调用配置#

你需要在 src/config/config.default.ts  中增加你需要调用的目标服务以及它的 proto 文件信息。

比如,这里我们填写了上面暴露的服务本身,以及该服务的 proto,包名等信息。

// src/config/config.default.ts
import { DefaultConfig } from '@midwayjs/grpc';import { join } from 'path';
export const grpc = {  services: [    {      url: 'localhost:6565',      protoPath: join(__dirname, '../../proto/helloworld.proto'),      package: 'helloworld',    },  ],} as DefaultConfig;

代码调用#

配置完后,我们就可以在代码里调用了。

@midwayjs/grpc  提供了 clients ,可以方便的获取到已配置的服务。我们只需要在需要注入的地方,注入这个对象即可。

比如:

import { Provide, Inject } from '@midwayjs/decorator';import { helloworld, hero } from '../interface';import { Clients } from '@midwayjs/grpc';
@Provide()export class UserService {  @Inject()  grpcClients: Clients;}

我们通过 clients  获取到对方服务的客户端实例,然后调用即可。

import { Provide, Inject } from '@midwayjs/decorator';import { helloworld, hero } from '../interface';import { Clients } from '@midwayjs/grpc';
@Provide()export class UserService {  @Inject()  grpcClients: Clients;
  async invoke() {    // 获取服务    const greeterService = this.grpcClients.getService<helloworld.GreeterClient>('helloworld.Greeter');
    // 调用服务    const result = await greeterService.sayHello().sendMessage({      name: 'harry',    });
    // 返回结果    return result;  }}

我们也可以利用 @Init  装饰器,将需要调用的服务缓存到属性上。这样可以在其他方法调用时复用。

示例如下。

import { GrpcMethod, MSProviderType, Provider, Provide, Inject, Init } from '@midwayjs/decorator';import { helloworld, hero } from '../interface';import { Clients } from '@midwayjs/grpc';
@Provide()@Provider(MSProviderType.GRPC, { package: 'hero' })export class HeroService implements hero.HeroService {  // 注入客户端  @Inject()  grpcClients: Clients;
  greeterService: helloworld.Greeter;
  @Init()  async init() {    // 赋值一个服务实例    this.greeterService = this.grpcClients.getService<helloworld.GreeterClient>('helloworld.Greeter');  }
  @GrpcMethod()  async findOne(data) {    // 调用服务    const result = await greeterService.sayHello().sendMessage({      name: 'harry',    });
    // 返回结果    return result;  }}

流式服务#

gRPC 的流式服务用于减少连接,让服务端或者客户端不需要等待即可执行任务,从而提高执行效率。

gRPC 的流式服务分为三种,以服务端角度来说,为

  • 服务端接收流(客户端推)
  • 服务端响应流(服务端推)
  • 双向流

下面我们将一一介绍。

流式 proto 文件#

流式的 proto 文件写法不同,需要在希望使用流式的地方将参数标记为 stream 。


syntax = "proto3";
package math;
message AddArgs {  int32 id = 1;  int32 num = 2;}
message Num {  int32 id = 1;  int32 num = 2;}
service Math {  rpc Add (AddArgs) returns (Num) {  }
    // 双向流  rpc AddMore (stream AddArgs) returns (stream Num) {  }
  // 服务端往客户端推  rpc SumMany (AddArgs) returns (stream Num) {  }
  // 客户端往服务端推  rpc AddMany (stream AddArgs) returns (Num) {  }}

该服务生成的接口定义为:

import {  IClientDuplexStreamService,  IClientReadableStreamService,  IClientUnaryService,  IClientWritableStreamService,  IClientOptions,} from '@midwayjs/grpc';
export namespace math {  export interface AddArgs {    id?: number;    num?: number;  }  export interface Num {    id?: number;    num?: number;  }
  /**   * server interface   */  export interface Math {    add(data: AddArgs): Promise<Num>;    addMore(data: AddArgs): Promise<void>;    // 服务端推,客户端读    sumMany(data: AddArgs): Promise<void>;    // 客户端端推,服务端读    addMany(num: AddArgs): Promise<void>;  }
  /**   * client interface   */  export interface MathClient {    add(options?: IClientOptions): IClientUnaryService<AddArgs, Num>;    addMore(options?: IClientOptions): IClientDuplexStreamService<AddArgs, Num>;    // 服务端推,客户端读    sumMany(options?: IClientOptions): IClientReadableStreamService<AddArgs, Num>;    // 客户端端推,服务端读    addMany(options?: IClientOptions): IClientWritableStreamService<AddArgs, Num>;  }}

服务端推送#

客户端调用一次,服务端可以多次返回。通过 @GrpcMethod()  的参数来标识流式类型。

可用的类型为:

  • GrpcStreamTypeEnum.WRITEABLE  服务端输出流(单工)
  • GrpcStreamTypeEnum.READABLE  客户端输出流(单工),服务端接受多次
  • GrpcStreamTypeEnum.DUPLEX  双工流

服务端示例如下:

import { GrpcMethod, GrpcStreamTypeEnum, Inject, MSProviderType, Provide, Provider } from '@midwayjs/decorator';import { Context } from '@midwayjs/grpc';import { math } from '../interface';import { Metadata } from '@grpc/grpc-js';
/** */@Provide()@Provider(MSProviderType.GRPC, { package: 'math' })export class Math implements math.Math {  @Inject()  ctx: Context;
  @GrpcMethod({ type: GrpcStreamTypeEnum.WRITEABLE })  async sumMany(args: math.AddArgs) {    this.ctx.write({      num: 1 + args.num,    });    this.ctx.write({      num: 2 + args.num,    });    this.ctx.write({      num: 3 + args.num,    });
    this.ctx.end();  }
  // ...}

服务端使用 ctx.write  方法来返回数据,由于是服务端流,可以返回多次。

返回结束后,请使用 ctx.end()  方法关闭流。

客户端,调用一次,接受多次数据。

比如下面的累加逻辑。

Promise 写法,会等待服务端数据都返回再做处理。

// 服务端推送let total = 0;let result = await service.sumMany().sendMessage({  num: 1,});
result.forEach((data) => {  total += data.num;});
// total = 9;

事件写法,实时处理。

// 服务端推送let call = service.sumMany().getCall();
call.on('data', (data) => {  // do something});
call.sendMessage({  num: 1,});

客户端推送#

客户端调用多次,服务端接收多次数据,返回一个结果。通过 @GrpcMethod({type: GrpcStreamTypeEnum.READABLE}) 的参数来标识流式类型。

服务端示例如下:

import { GrpcMethod, GrpcStreamTypeEnum, Inject, MSProviderType, Provide, Provider } from '@midwayjs/decorator';import { Context } from '@midwayjs/grpc';import { math } from '../interface';import { Metadata } from '@grpc/grpc-js';
/** */@Provide()@Provider(MSProviderType.GRPC, { package: 'math' })export class Math implements math.Math {  sumDataList: number[] = [];
  @Inject()  ctx: Context;
  @GrpcMethod({ type: GrpcStreamTypeEnum.READABLE, onEnd: 'sumEnd' })  async addMany(data: math.Num) {    this.sumDataList.push(data);  }
  async sumEnd(): Promise<math.Num> {    const total = this.sumDataList.reduce((pre, cur) => {      return {        num: pre.num + cur.num,      };    });    return total;  }
  // ...}

客户端每次调用,都会触发一次 addMany 方法。

在客户端发送 end  事件之后,会调用 @GrpcMethod  装饰器上的 onEnd  参数指定的方法,该方法的返回值即为最后客户端拿到的值。

客户端示例如下:

// 客户端推送const data = await service.addMany().sendMessage({ num: 1 }).sendMessage({ num: 2 }).sendMessage({ num: 3 }).end();
// data.num = 6

双向流#

客户端可以调用多次,服务端也可以接收多次数据,返回多个结果,类似于传统的 TCP 通信。通过 @GrpcMethod({type: GrpcStreamTypeEnum.DUPLEX}) 的参数来标识双工流式类型。

服务端示例如下:

import { GrpcMethod, GrpcStreamTypeEnum, Inject, MSProviderType, Provide, Provider } from '@midwayjs/decorator';import { Context } from '@midwayjs/grpc';import { math } from '../interface';import { Metadata } from '@grpc/grpc-js';
/** */@Provide()@Provider(MSProviderType.GRPC, { package: 'math' })export class Math implements math.Math {  @Inject()  ctx: Context;
  @GrpcMethod({ type: GrpcStreamTypeEnum.DUPLEX, onEnd: 'duplexEnd' })  async addMore(message: math.AddArgs) {    this.ctx.write({      id: message.id,      num: message.num + 10,    });  }
  async duplexEnd() {    console.log('got client end message');  }  // ...}

服务端可以随时使用 ctx.write  返回数据,也可以使用 ctx.end  来关闭流。

客户端示例:

对于双工通信的客户端,由于无法保证调用、返回的顺序,我们需要使用监听的模式来消费结果。

const clientStream = service.addMore().getCall();
let total = 0;let idx = 0;
duplexCall.on('data', (data: math.Num) => {  total += data.num;  idx++;  if (idx === 2) {    duplexCall.end();    // total => 29  }});
duplexCall.write({  num: 3,});
duplexCall.write({  num: 6,});

如果希望保证调用顺序,我们也提供了保证顺序的双向流调用方法,但是需要在 proto 中定义一个固定的 id,来确保顺序。

比如我们的 Math.proto,对每个入参和出参,都增加了一个固定的 id,所以可以固定顺序。


syntax = "proto3";
package math;
message AddArgs {  int32 id = 1;                         //  这里的 id 名字是固定的  int32 num = 2;}
message Num {  int32 id = 1;                         //  这里的 id 名字是固定的  int32 num = 2;}
service Math {  rpc Add (AddArgs) returns (Num) {  }
  rpc AddMore (stream AddArgs) returns (stream Num) {  }
  // 服务端往客户端推  rpc SumMany (AddArgs) returns (stream Num) {  }
  // 客户端往服务端推  rpc AddMany (stream AddArgs) returns (Num) {  }}

固定顺序的客户端调用方式如下:

// 保证顺序的双向流const t = service.addMore();
const result4 = await new Promise<number>((resolve, reject) => {  let total = 0;
  // 第一次调用和返回  t.sendMessage({    num: 2,  })    .then((res) => {      expect(res.num).toEqual(12);      total += res.num;    })    .catch((err) => console.error(err));
  // 第二次调用和返回  t.sendMessage({    num: 5,  })    .then((res) => {      expect(res.num).toEqual(15);      total += res.num;      resolve(total);    })    .catch((err) => console.error(err));
  t.end();});
// result4 => 27

默认的 id 为 id ,如果服务端定义不同,需要修改,可以在客户端调用时传递。

// 保证顺序的双向流const t = service.addMore({  messageKey: 'uid',});

元数据(Metadata)#

gRPC 的元数据等价于 HTTP 的上下文。

服务端通过 ctx.sendMetadata  方法返回元数据,也可以通过 ctx.metadata 获取客户端传递的元数据。

import { MSProviderType, Provider, Provide, GrpcMethod } from '@midwayjs/decorator';import { helloworld } from '../domain/helloworld';import { Metadata } from '@grpc/grpc-js';import { Context } from '@midwayjs/grpc';
/** * 实现 helloworld.Greeter 接口的服务 */@Provide()@Provider(MSProviderType.GRPC, { package: 'helloworld' })export class Greeter implements helloworld.Greeter {  @Inject()  ctx: Context;
  @GrpcMethod()  async sayHello(request: helloworld.HelloRequest) {    // 客户端传递的元数据    console.log(this.ctx.metadata);
    // 创建元数据    const meta = new Metadata();    this.ctx.metadata.add('xxx', 'bbb');    this.ctx.sendMetadata(meta);
    return { message: 'Hello ' + request.name };  }}

客户端通过方法的 options 参数传递元数据。

import { Metadata } from '@grpc/grpc-js';
const meta = new Metadata();meta.add('key', 'value');
const result = await service  .sayHello({    metadata: meta,  })  .sendMessage({    name: 'harry',  });

获取元数据相对麻烦一些。

普通一元调用(UnaryCall)获取元数据需要使用 sendMessageWithCallback  方法。

const call = service.sayHello().sendMessageWithCallback(  {    name: 'zhangting',  },  (err) => {    if (err) {      reject(err);    }  });call.on('metadata', (meta) => {  // output meta});

其他流式服务,可以通过 getCall()  方法获取原始客户端流对象,从而直接订阅。

// 获取服务,注意,这里没有 awaitconst call = service.addMany().getCall();call.on('metadata', (meta) => {  // output meta});

超时处理#

我们可以在调用服务时传递参数,单位毫秒。

const result = await service  .sayHello({    timeout: 5000,  })  .sendMessage({    name: 'harry',  });