Websocket
原生 ws
import mitt, { type Emitter } from 'mitt'
export interface WsOptions {
PING?: string // 心跳字符串,默认:ping
PONG?: string // 默认:pong
heartTimeout?: number // 心跳时间,默认:10 * 1000
reconnectTimeout?: number // 重连等待时间,默认:30 * 1000
autoReconnect?: boolean // 是否自动重连,默认:true
}
export type WsEvent = 'open' | 'message' | 'error' | 'close' | 'reconnect'
export class WebsocketClient {
private closeByActive = false // 是否主动关闭连接
private reconnecting = false // 是否正在重连
private websocketState = false // ws 状态是否正常
private reconnectId: NodeJS.Timeout | null = null
private socket: WebSocket | null = null
private url: string
private firstParams?: unknown // 首次连接需要发送的参数
private options: WsOptions = {
autoReconnect: true,
PING: 'ping',
PONG: 'pong',
heartTimeout: 10 * 1000,
reconnectTimeout: 30 * 1000
}
private bus: Emitter<Record<string, unknown>>
constructor(url: string, options?: WsOptions) {
this.url = url
this.bus = mitt()
if(options) {
this.options = Object.assign(this.options, options)
}
}
connect(firstParams?: unknown) {
this.clear()
if(firstParams) {
this.firstParams = firstParams
}
this.socket = new WebSocket(this.url)
this.socket.addEventListener('open', this.onOpen.bind(this))
this.socket.addEventListener('message', this.onMessage.bind(this))
this.socket.addEventListener('error', this.onError.bind(this))
this.socket.addEventListener('close', this.onClose.bind(this))
}
send(data: unknown) {
let params: any = data
if(typeof data !== 'string') {
params = JSON.stringify(data)
}
if(this.socket?.readyState === WebSocket.OPEN) {
this.socket.send(params)
}
}
private clear() {
if(this.socket) {
this.socket.removeEventListener('open', this.onOpen.bind(this))
this.socket.removeEventListener('message', this.onMessage.bind(this))
this.socket.removeEventListener('error', this.onError.bind(this))
this.socket.removeEventListener('close', this.onClose.bind(this))
this.socket.close()
this.socket = null
}
}
close() {
this.closeByActive = true
this.clear()
}
private startHeart() {
this.send(this.options.PING)
this.waitingServer()
}
private waitingServer() {
this.websocketState = false
setTimeout(() => {
// 10s 后重发心跳
if(this.websocketState) {
this.startHeart()
return
}
if(!this.closeByActive) {
// 如果心跳没有正常响应,并且不是用户主动关闭就发起重连
this.reconnect()
}
}, this.options.heartTimeout)
}
private reconnect() {
if(!this.options.autoReconnect || !this.reconnecting) return
if(this.reconnectId) clearInterval(this.reconnectId)
const task = () => {
this.connect()
}
task()
this.reconnecting = true
this.reconnectId = setInterval(task, this.options.reconnectTimeout)
}
private onOpen() {
if(this.reconnectId) clearInterval(this.reconnectId)
const isReconnect = this.reconnecting // 记录一下是否是重连
this.websocketState = true
this.reconnecting = false
this.closeByActive = false
this.bus.emit('open')
if(this.firstParams) {
// 如果 connect 时有参数,连接上时发送一个请求
this.send(this.firstParams)
}
this.startHeart()
if(isReconnect) {
// 如果是重连,需要通知重连成功
this.bus.emit('reconnect')
}
}
private onMessage(e: any) {
const message = e.data
if(message) {
this.websocketState = true
}
if(message !== this.options.PONG && message !== 'established') {
// 接收到消息
this.bus.emit('message', message)
}
}
private onError(){
this.websocketState = false
this.bus.emit('error')
if(this.closeByActive) return
this.reconnect()
}
private onClose(){
this.websocketState = false
this.bus.emit('close')
if(this.closeByActive) return
this.reconnect()
}
// 注册事件监听
on(type: WsEvent, handler: (event: any) => void) {
this.bus.on(type, handler)
}
}
socket.io
import { io } from 'socket.io-client'
import type { Socket } from 'socket.io-client'
import mitt, { type Emitter } from 'mitt'
interface IcallBack {
[propName: string | number]: (response: any) => void
}
interface Icmd {
[propName: string | number]: string
}
/**
* Socket.io 客户端
*/
export class WebSocketClient {
private socket: Socket
private cmds: Icmd
private callbacks: IcallBack
private ack: number | string
private bus: Emitter<Record<string, unknown>>
constructor(serverUrl: string) {
this.bus = mitt()
//初始化socket
this.socket = io(serverUrl, {
// 是否自动连接
autoConnect: false,
transports: ['websocket'],
})
this.socket.connect()
// 在连接到命名空间(包括成功的重新连接)时触发
this.socket.on('connect', () => {
// TODO: 连接成功,收到该事件后编辑器界面才会进行初始化
this.bus.emit('SOCKET_CONNECT', {
type: 'connected',
message: serverUrl,
})
})
// 连接错误时
this.socket.on('connect_error', () => {
setTimeout(() => {
this.socket.connect()
}, 1000)
})
// 断开时连接
this.socket.on('disconnect', (reason: string) => {
// 显示断开,需要手动重连
if (
reason === 'io server disconnect' ||
reason === 'io client disconnect'
) {
this.socket.connect()
}
})
// 接收到服务端消息,message 是自定义的字符串
this.socket.on('message', (ack: string, data: any) => {
this.callbacks[ack]?.(data)
this.bus.emit('SOCKET_MESSAGE', data)
}
)
// 存放传入的回掉方法<确认码, 回掉方法>
this.callbacks = {}
// 存储操作命令<确认码, 操作命令>
this.cmds = {}
//确认码
this.ack = 0
}
/**
* 发送消息
* @param cmd 操作命令
* @param data 消息
* @param callback 回调方法
*/
request(
cmd: string,
data: any,
callback?: (response: any) => void
): void {
const ack = ++(this.ack as number)
if (callback !== undefined) {
this.callbacks[ack] = callback
}
this.cmds[ack] = cmd
// 发送消息给服务器
this.socket.emit(cmd, ack, data)
}
}