最近在做一款轻量级IM产品,后端技术栈框架使用了nodejs + nestjs作为服务端。同时,还需要满足一个服务同时支持HTTP服务调用以及WebSocket服务调用,此文主要记录本次搭建过程,以及基本的服务端设计。
基本环境搭建
node v14.17.5
nestjs 全局命令行工具(npm i -g @nestjs/cli
)
本文不再详细介绍nestjs各种概念,请参考:First steps | NestJS - A progressive Node.js framework
直接创建一个Demo项目:
nest new nest-http-socket-demo
目录划分设计
等待项目完成以后(这个过程可能会持续比较久,因为创建好目录结构以后还会进行包安装),结构如下:
nest-http-websocket-demo ├─ .eslintrc.js ├─ .gitignore ├─ .prettierrc ├─ README.md ├─ nest-cli.json ├─ node_modules │└─ ... ... ├─ package.json ├─ src │├─ app.controller.spec.ts │├─ app.controller.ts │├─ app.module.ts │├─ app.service.ts │└─ main.ts ├─ test │├─ app.e2e-spec.ts │└─ jest-e2e.json ├─ tsconfig.build.json ├─ tsconfig.json └─ yarn.lock
初始的目录结构可能不太符合我们的期望,我们对目录结构进行适当的调整。主要分为几个目录:
src/common。该目录存放服务端和客户端公共涉及的内容。方便后续拆分出单独的npm包供服务端和客户端公用; src/base。该目录存放整个服务需要用到的一些基础内容,譬如拦截器、过滤器等; src/module。后续存放按照不同的业务领域拆分出的子目录; src/entity。存放数据定义等(本项目我们简化模型,认为数据传输的结构和服务中领域数据结构一致)。
调整后的src目录结构如下:
- src ├─ base ├─ common ├─ entity └─ module
基础类型定义
在规划API之前,我们先设计定义一些服务端基本数据结构。
服务端响应封装(ServerResponseWrapper)
众所周知,一般的服务端都会对原始返回数据进行一定的包装,增加返回码、错误消息等来明确的指出具体的错误内容,在我们的服务也不例外。于是,我们设计如下的结构体:
export interface ServerResponseWrapper { /** * 服务端返回码 */ returnCode: string; /** * 错误信息(如有,例如返回码非成功码) */ errorMessage?: string; /** * 返回数据(如有) */ data?: any; }
对于该结构来说,后续客户端也会使用相同的数据结构进行解析,所以我们可以考虑将该文件放在src/common中。
下面是一些常见的返回数据(纯样例):
// 获取用户基本信息成功 { "returnCode": "SUC00000", "data": { "username": "w4ngzhen", "lastLoginTime": "2022-11-22 11:50:22.000" } } // 获取用户名称出错(没有提供对应的userId) { "returnCode": "ERR40000", "errorMessage": "user id is empty.", } // 获取服务端时间 { "returnCode": "SUC0000", "data": "2022-11-22 11:22:33.000" }
返回码定义(ReturnCode)
为了统一返回码,我们在定义了一个ReturnCode实体类,由该类统一封装返回码。作为外部会涉及了解到的内容,我们也将该类放置于src/common中,且导出常用的错误码,代码如下:
export class ReturnCode { private readonly _preCode: 'SUC' | 'ERR'; private readonly _subCode: string; private readonly _statusCode: number; get codeString(): string { return `${this._preCode}${this._subCode}`; } get statusCode(): number { return this._statusCode; } constructor(prefix: 'SUC' | 'ERR', subCode: string, statusCode: number) { this._preCode = prefix; this._subCode = subCode; this._statusCode = statusCode; } } export const SUCCESS = new ReturnCode('SUC', '00000', 200); export const ERR_NOT_FOUND = new ReturnCode('ERR', '40400', 404);
服务业务异常(BizException)
为了便于在服务调用过程中,能够按照具体的业务层面进行异常抛出。我们定义一个名为BizException的类来封装业务异常。对于外部系统来说,该异常并不可见,所以我们把该类放置于src/base中:
import {ReturnCode} from "../common/return-code"; export class BizException { private readonly _errorCode: ReturnCode; private readonly _errorMessage: string; get errorCode(): ReturnCode { return this._errorCode; } get errorMessage(): string { return this._errorMessage; } protected constructor(errorEntity: ReturnCode, errorMessage: string) { this._errorMessage = errorMessage; this._errorCode = errorEntity; } static create(errEntity: ReturnCode, errMessage?: string): BizException { return new BizException(errEntity, errMessage); } }
接下来,我们为服务器规划两个API,分别体现HTTP服务和WebSocket服务。
HTTP服务开发
基础服务
首先,我们设计一个简单用户信息查询服务接口。该接口可以根据传递而来的用户ID(userId)返回对应的用户信息:
GET /users?userId=${userId}
为了实现上述接口,我们按照如下流程进行API搭建:
在src/entity目录中,我们创建一个user目录,并在其中创建user.dto.ts文件专门用于定义用户User这个数据传输结构,内容如下:
// src/entity/user/user.dto.ts export interface UserDto { userId: string; username: string; age: number; }
在src/module创建一个user目录,划分用户user相关业务领域内容。同时,在其中创建user.service.ts,存放处理用户的相关服务代码,内容如下:
// src/module/user/user.service.ts import {Injectable} from '@nestjs/common'; import {UserDto} from "../../entity/user/user.dto"; @Injectable() export class UserService { async getUserById(userId: string): Promise<UserDto> { // 测试数据 const demoData: UserDto[] = [ { userId: 'tom', username: 'Tom', age: 10 }, { userId: 'jerry', username: 'Jerry', age: 11 } ]; return demoData.find(u => u.userId === userId); } }
同样的,我们在src/module/user中创建User的Controller(user.controller.ts
),增加GET /users
接口,请求参数并调用服务:
import {Controller, Get, Param, Query} from '@nestjs/common'; import {UserService} from './user.service'; import {UserDto} from "../../entity/user/user.dto"; @Controller("users") export class UserController { constructor(private readonly userService: UserService) { } @Get() async getHello(@Query('userId') userId: string): Promise<UserDto> { return this.userService.getUserById(userId); } }
创建用户模块,将controller、service注册到用户模块中(src/module/user/user.module.ts
):
import { Module } from '@nestjs/common'; import { UserController } from './user.controller'; import { UserService } from './user.service'; @Module({ imports: [], controllers: [UserController], providers: [UserService], }) export class UserModule {}
将用户模块注册给全局总模块app.module.ts中:
import { AppService } from './app.service'; +import {UserModule} from "./module/user/user.module"; @Module({ - imports: [], + imports: [UserModule], controllers: [AppController], providers: [AppService], })
完成上述操作以后,我们就可以启动服务进行验证了:
成功响应拦截器
上面的接口返回可以看出,Controller返回是什么样的结构体,前端请求到的数据就是什么结构,但我们希望将数据按照ServerResponseWrapper结构进行封装。在nestjs中,可以通过实现来自@nestjs/common
中的NestInterceptor
接口来编写我们自己的响应拦截,统一处理响应来实现前面的需求。按照我们之前规划,我们首先在src/base中创建interceptor目录,然后在里面创建http-service.response.interceptor.ts
,内容如下:
// src/base/interceptor/http-service.response.interceptor.ts import {CallHandler, ExecutionContext, NestInterceptor} from "@nestjs/common"; import {map, Observable} from "rxjs"; import {ServerResponseWrapper} from "../../common/server-response-wrapper"; import {SUCCESS} from "../../common/return-code"; /** * 全局Http服务响应拦截器 * 该Interceptor在main中通过 * app.useGlobalInterceptors 来全局引入, * 仅处理HTTP服务成功响应拦截,异常是不会进入该拦截器 */ export class HttpServiceResponseInterceptor implements NestInterceptor { intercept(context: ExecutionContext, next: CallHandler): Observable<any> | Promise<Observable<any>> { return next.handle().pipe(map(data => { // 进入该拦截器,说明没有异常,使用成功返回 const resp: ServerResponseWrapper = { returnCode: SUCCESS.codeString, data: data }; return resp; })) } }
创建完成后,我们在main入口中,需要将该响应拦截器注册到全局中:
// src/main.ts async function bootstrap() { const app = await NestFactory.create(AppModule); + + // 增加HTTP服务的成功响应拦截器 + app.useGlobalInterceptors(new HttpServiceResponseInterceptor()); + await app.listen(3000); } bootstrap();
完成配置以后,我们可以再次调用API来查看结果:
可以看到,尽管我们的Controller返回的是一个实际数据结构(Promise也适用),但是经过响应拦截器的处理,我们完成了对响应体的包裹封装。
异常过滤器
上述我们完成一个调用,并对响应成功的数据进行了包裹,但面对异常情况同样适用吗?如果不适用又需要如何处理呢?
首先,我们增加一个专门处理字段错误的错误码ReturnCode:
// src/common/return-code.ts export const SUCCESS = new ReturnCode('SUC', '00000', 200); +export const ERR_REQ_FIELD_ERROR = new ReturnCode('ERR', '40000', 400); export const ERR_NOT_FOUND = new ReturnCode('ERR', '40400', 404);
然后,我们在UserService中适当修改一下getUserById的实现,加入userId判空判断,并在为空的时候,抛出业务异常(这个过程我们顺便安装了lodash):
+import * as _ from 'lodash'; +import {BizException} from "../../common/biz-exception"; +import {ERR_REQ_FIELD_ERROR} from "../../common/return-code"; @Injectable() export class UserService { async getUserById(userId: string): Promise<UserDto> { +if (_.isEmpty(userId)) { +throw BizException.create(ERR_REQ_FIELD_ERROR, 'user id is empty'); +} ... ... } }
完成上述修改后,我们尝试发请求时候,故意不填写userId,得到如下的结果:
可以看到,尽管nestjs帮助我们进行一定的封装,但是结构体与我们一开始定义的ServerResponseWrapper是不一致的。为了保持一致,我们需要接管nestjs的异常处理,并转换为我们自己的wrapper结构,而接管的方式则是创建一个实现ExceptionFilter接口的类(按照路径划分,我们将这个类所在文件http-service.exception.filter.ts
存放于src/base/filter目录下):
import {ArgumentsHost, Catch, ExceptionFilter, HttpException} from "@nestjs/common"; import {ServerResponseWrapper} from "../../common/server-response-wrapper"; import {BizException} from "../../common/biz-exception"; /** * 全局Http服务的异常处理, * 该Filter在main中通过 * app.useGlobalExceptionFilter来全局引入, * 仅处理HTTP服务 */ @Catch() export class HttpServiceExceptionFilter implements ExceptionFilter { catch(exception: any, host: ArgumentsHost): any { // 进入该拦截器,说明http调用中存在异常,需要解析异常,并返回统一处理 let responseWrapper: ServerResponseWrapper; let httpStatusCode: number; if (exception instanceof BizException) { // 业务层Exception responseWrapper = { returnCode: exception.errorCode.codeString, errorMessage: exception.errorMessage } httpStatusCode = exception.errorCode.statusCode; } else if (exception instanceof HttpException) { // 框架层的Http异常 responseWrapper = { returnCode: 'IM9009', errorMessage: exception.message, } httpStatusCode = exception.getStatus(); } else { // 其他错误 responseWrapper = { returnCode: 'IM9999', errorMessage: 'server unknown error: ' + exception.message, }; httpStatusCode = 500; } // 该拦截器处理HTTP服务的异常,所以手动切换到HTTP Host // 并获取响应response,进行HTTP响应的写入 const httpHost = host.switchToHttp(); const response = httpHost.getResponse(); response.status(httpStatusCode).json(responseWrapper); } }
该类的核心点在于,对捕获到的异常进行解析后,我们会通过参数ArgumentsHost来获取实际的HTTP Host,并从中获取response对象,调用相关支持的方法来控制响应response的内容(http状态码以及响应体内容)。
最后,我们依然在main里面进行注册配置:
+import {HttpServiceExceptionFilter} from "./base/filter/http-service.exception.filter"; async function bootstrap() { const app = await NestFactory.create(AppModule); // 增加HTTP服务的成功响应拦截器 app.useGlobalInterceptors(new HttpServiceResponseInterceptor()); + // 增加HTTP服务的异常过滤器,进行响应包裹 + app.useGlobalFilters(new HttpServiceExceptionFilter()); await app.listen(3000); }
完成开发配置以后,我们重启服务,通过调用接口可以看到对应异常返回:
WebSocket服务
在nestjs中想要集成WebSocket服务也很容易。
首先,我们使用一个装饰器@WebSocketGateway()
来表明一个类是一个WebSocket的网关(Gateway),这个装饰器可以指定WebSocket服务的端口等信息。通常情况下,我们可以设置与HTTP服务不一样的端口,这样我们就可以在一个台服务上通过不同的端口暴露HTTP和WebSocket服务。当然,这不是必须,只是为了更好的区分服务。
其次,我们需要明白在nestjs可以使用ws或者socket.io两种具体实现的websocket平台。什么是具体平台?简单来讲,nestjs只负责设置一个标准的WebSocket网关规范,提供通用的API、接口、装饰器等,各个平台则是根据nestjs提供的规范进行实现。在本例中,我们选择使用socket.io作为nestjs上WebSocket具体的实现,因为socket.io是一个比较著名websocket库,同时支持服务端和客户端,并且在客户端/服务端均内建支持了"请求 - 响应"一来一回机制。
前置准备
依赖安装
nestjs中的websocket是一个独立的模块,且我们选取了socket.io作为websocket的实现,所以我们需要首先安装一下的基础模块:
yarn add @nestjs/websockets @nestjs/platform-socket.io
网关创建
websocket的相关内容,我们同样作为一种模块进行编写。于是,我们在src/module/目录中创建websocket文件夹,并在里面创建一个文件:my-websocket.gateway.ts,编写WS网关MyWebSocketGateway类的内容:
import {WebSocketGateway} from "@nestjs/websockets"; @WebSocketGateway(4000, { transports: ['websocket'] }) export class MyWebSocketGateway { }
一个简单的WebSocket网关就创建完成了。我们首先设定了WebSocket服务的端口号为4000(与HTTP服务的3000隔离开);其次,需要特别提一下transports参数,可选择的transport有两种:
polling(HTTP长连接轮询)
该机制由连续的 HTTP 请求组成:
长时间运行的请求,用于从服务器接收数据GET
短运行请求,用于将数据发送到服务器POST
由于传输的性质,连续的发出可以在同一 HTTP 请求中连接和发送。
也就是说,polling本质上是利用HTTP请求+轮询来完成所谓的双工通讯,在某些古老的没有实现真正WebSocket协议的浏览器作为一种实现方案。
websocket(网络套接字)
WebSocket 传输由WebSocket 连接组成,该连接在服务器和客户端之间提供双向和低延迟的通信通道。这是真正的长连接双工通讯协议。
所以,在通讯的过程中,服务端与客户端要保持相匹配的传输协议。
模块创建注册
同样的,我们在src/module/websocket中创建一个my-websocket.module.ts文件,内容如下:
import {MyWebSocketGateway} from "./my-websocket.gateway"; import {Module} from "@nestjs/common"; @Module({ providers: [MyWebSocketGateway] }) export class MyWebSocketModule { }
主要内容是将MyWebSocketGateway注册到模块中。
最后我们将MyWebSocket模块注册到根模块中:
+import {MyWebSocketModule} from "./module/websocket/my-websocket.module"; @Module({ - imports: [UserModule], + imports: [UserModule, MyWebSocketModule], controllers: [AppController], providers: [AppService], }) export class AppModule {}
基础服务
我们先设定这样一个场景:客户端连接上WebSocket服务后,可以给服务端发送一份JSON数据(内容加下方),服务端校验该数据后,在控制台打印数据。
{ "name": "w4ngzhen" }
对于服务端来说,我们首先需要订阅事件(subscribe),假设发送JSON数据的事件为hello
,那么我们可以通过如下的方式来进行订阅:
export class MyWebSocketGateway { @SubscribeMessage('hello') hello(@MessageBody() reqData: { name: string }) { if (!reqData || !reqData.name) { throw BizException.create(ERR_REQ_FIELD_ERROR, 'data is empty'); } console.log(JSON.stringify(reqData)); } }
测试WebSocket,可以使用postman来进行,只需要创建个一WebSocket的请求,在postman中按下CTRL+N(macOS为command+N),可以选择WebSocket请求:
创建后,需要注意,由于我们nestjs集成的WebSocket实现使用的socket.io,所以客户端需要匹配对应的实现(这点主要是为了匹配”请求-响应“一来一回机制)
完成配置后,我们可以采用如下的步骤进行事件发送:
发送完成后,就会看到postman的打印和nodejs服务控制台的打印,符合我们的预期:
当然,我前面提到过socket.io支持事件一来一回的请求响应模式。在nestjs中的WebSocket网关,只需要在对应的请求返回值即可:
@SubscribeMessage('hello') hello(@MessageBody() reqData: { name: string }) { if (!reqData || !reqData.name) { throw BizException.create(ERR_REQ_FIELD_ERROR, 'data is empty'); } console.log(JSON.stringify(reqData)); +return 'received reqData'; }
在postman的地方,我们需要发送的时候勾选上Acknowledgement
:
完成以后,我们重新连接服务并发送数据,就可以看到一条完整的事件处理链路了:
至此,我们就完成了在Nestjs集成一个基础的WebSocket服务了。
当然,我们的工作还没有结束。在前面我们对HTTP服务编写了成功响应拦截器以及异常过滤器,接下来,我们按照同样的方式编写WebSocket的相关处理。
成功响应拦截器
对于集成在nestjs中的WebSocket服务,想要编写并配置一个成功响应拦截器并不复杂,没有什么坑。
首先,我们仿照着http-service.response.interceptor.ts,编写一个几乎完全一样的ws-service.response.interceptor.ts,与HTTP的成功响应拦截器放在相同目录src/base/interceptor中:
// src/base/interceptor/ws-service.response.interceptor.ts import {CallHandler, ExecutionContext, NestInterceptor} from "@nestjs/common"; import {map, Observable} from "rxjs"; import {ServerResponseWrapper} from "../../common/server-response-wrapper"; import {SUCCESS} from "../../common/return-code"; /** * 全局WebSocket服务响应拦截器 * 该Interceptor在网关中通过装饰器 @UseInterceptors 使用 * 仅处理WebSocket服务成功响应拦截,异常是不会进入该拦截器 */ export class WsServiceResponseInterceptor implements NestInterceptor { intercept(context: ExecutionContext, next: CallHandler): Observable<any> | Promise<Observable<any>> { return next.handle().pipe(map(data => { // 进入该拦截器,说明没有异常,使用成功返回 const resp: ServerResponseWrapper = { returnCode: SUCCESS.codeString, data: data }; return resp; })) } }
其次,与HTTP注册拦截器不同的是,nestjs中注册WebSocket的拦截器,需要在网关类上使用装饰器进行:
+ // 安装WebSocket成功响应拦截器 + @UseInterceptors(new WsServiceResponseInterceptor()) @WebSocketGateway(4000, { transports: ['websocket'] }) export class MyWebSocketGateway { ... ...
配置完成以后,我们重启服务,再次使用postman进行WebSocket事件请求,则会看到经过包装后的响应体:
异常过滤器
当然,我们尝试不发送任何的数据。理论上,则会进入校验流程不通过的场景,抛出BizException。在实际的发送中,我们会看到,postman无法接受到异常:
在服务端会看到一个异常报错:
对于这个问题,我们的需求是无论是否有异常,都需要使用ServerResponseWrapper进行包裹。与HTTP不同的是,WebSocket的异常过滤器需要实现WsExceptionFilter
接口,实现该接口的catch方法:
import {ArgumentsHost, Catch, ExceptionFilter, HttpException, WsExceptionFilter} from "@nestjs/common"; import {ServerResponseWrapper} from "../../common/server-response-wrapper"; import {BizException} from "../../common/biz-exception"; /** * 全局WebSocket服务的异常处理, * 该Filter在网关中通过 使用 @UseFilters 来进行注册 * 仅处理WebSocket网关服务 */ @Catch() export class WsServiceExceptionFilter implements WsExceptionFilter { catch(exception: any, host: ArgumentsHost): any { // 进入该拦截器,说明http调用中存在异常,需要解析异常,并返回统一处理 let responseWrapper: ServerResponseWrapper; if (exception instanceof BizException) { // 业务层Exception responseWrapper = { returnCode: exception.errorCode.codeString, errorMessage: exception.errorMessage } } else { // 其他错误 responseWrapper = { returnCode: 'IM9999', errorMessage: 'server unknown error: ' + exception.message, }; } // 对异常进行封装以后,需要让框架继续进行调用处理,才能正确的响应给客户端 // 此时,需要提取到callback这个函数 // 参考:https://stackoverflow.com/questions/61795299/nestjs-return-ack-in-exception-filter const callback = host.getArgByIndex(2); if (callback && typeof callback === 'function') { callback(responseWrapper); } } }
这个Filter与HTTP服务中的异常过滤器差异点主要三点:
1)WebSocket中不存在HTTP状态码且不存在HTTP异常,所以我们只需要解析区分BizException与非BizException。
2)WebSocket的异常过滤器中,想要继续后的数据处理,需要在方法返回前,从host中取到第三个参数对象(索引值为2),该值是一个回调函数,将处理后的数据作为参数,调用该callback方法,框架才能继续处理。-- WebSocket异常过滤器最终返回的关键点。
// 对异常进行封装以后,需要让框架继续进行调用处理,才能正确的响应给客户端 // 此时,需要提取到callback这个函数 // 参考:https://stackoverflow.com/questions/61795299/nestjs-return-ack-in-exception-filter const callback = host.getArgByIndex(2); if (callback && typeof callback === 'function') { callback(responseWrapper); }
3)注册该异常过滤器同样和WebSocket的响应拦截器一样,需要在网关类上使用@UseFilters
装饰器。
// 安装WebSocket成功响应拦截器 @UseInterceptors(new WsServiceResponseInterceptor()) + // 安装WebSocket异常过滤器 + @UseFilters(new WsServiceExceptionFilter()) @WebSocketGateway(4000, { transports: ['websocket'] })
完成该配置后,我们再次重启服务,使用postman,可以看到wrapper包装后的效果:
附录
本次demo已经提交至github
w4ngzhen/nest-http-websocket-demo (github.com)
同时,按照每一阶段进行了适配提交:
add: 添加WebSocket异常过滤器并注册到WebSocket网关中。 add: 添加WebSocket成功响应拦截器并注册到WebSocket网关中。 modify: 添加WebSocket的事件响应数据。 modify: 增减对事件”hello“的处理,并在控制台打印请求。 add: 创建一个基本的WebSocket网关以及将网关模块进行注册。 add: 增加nestjs websocket依赖、socket.io平台实现。 add: 添加HTTP服务异常过滤器,对异常进行解析并返回Wrapper包裹数据。 modify: 修改获取用户信息逻辑,加入userId判空检查。 add: 添加HTTP服务成功响应拦截器,对返回体进行统一Wrapper包裹。 modify: 注册user模块到app主模块。 add: 新增用户User模块相关的dto定义、service、controller以及module。 add: 添加ServerResponseWrapper作为服务端响应数据封装;添加返回码类,统一定义返回码;添加业务异常类,封装业务异常。 init: 初始化项目结构
我会逐步完善这个demo,接入各种常用的模块(数据库、Redis、S3-ECS等)。本文是本demo的初始阶段,已经发布于1.0版本tag。