Websocket

原生 ws

import mitt, { type Emitter } from 'mitt'

export interface WsOptions {
  PING?: string // 心跳字符串,默认:ping
  PONG?: string // 默认:pong
  heartTimeout?: number // 心跳时间,默认:10 * 1000
  reconnectTimeout?: number // 重连等待时间,默认:30 * 1000
  autoReconnect?: boolean // 是否自动重连,默认:true
}

export type WsEvent = 'open' | 'message' | 'error' | 'close' | 'reconnect'

export class WebsocketClient {
  private closeByActive = false // 是否主动关闭连接
  private reconnecting = false // 是否正在重连
  private websocketState = false // ws 状态是否正常
  private reconnectId: NodeJS.Timeout | null = null
  private socket: WebSocket | null = null
  private url: string
  private firstParams?: unknown // 首次连接需要发送的参数
  private options: WsOptions = {
    autoReconnect: true,
    PING: 'ping',
    PONG: 'pong',
    heartTimeout: 10 * 1000,
    reconnectTimeout: 30 * 1000
  }
  private bus: Emitter<Record<string, unknown>>

  constructor(url: string, options?: WsOptions) {
    this.url = url
    this.bus = mitt()
    if(options) {
      this.options = Object.assign(this.options, options)
    }
  }

  connect(firstParams?: unknown) {
    this.clear()
    if(firstParams) {
      this.firstParams = firstParams
    }
    this.socket = new WebSocket(this.url)
    this.socket.addEventListener('open', this.onOpen.bind(this))
    this.socket.addEventListener('message', this.onMessage.bind(this))
    this.socket.addEventListener('error', this.onError.bind(this))
    this.socket.addEventListener('close', this.onClose.bind(this))
  }

  send(data: unknown) {
    let params: any = data
    if(typeof data !== 'string') {
      params = JSON.stringify(data)
    }
    if(this.socket?.readyState === WebSocket.OPEN) {
      this.socket.send(params)
    }
  }

  private clear() {
    if(this.socket) {
      this.socket.removeEventListener('open', this.onOpen.bind(this))
      this.socket.removeEventListener('message', this.onMessage.bind(this))
      this.socket.removeEventListener('error', this.onError.bind(this))
      this.socket.removeEventListener('close', this.onClose.bind(this))
      this.socket.close()
      this.socket = null
    }
  }

  close() {
    this.closeByActive = true
    this.clear()
  }

  private startHeart() {
    this.send(this.options.PING)
    this.waitingServer()
  }

  private waitingServer() {
    this.websocketState = false
    setTimeout(() => {
      // 10s 后重发心跳
      if(this.websocketState) {
        this.startHeart()
        return
      }
      if(!this.closeByActive) {
        // 如果心跳没有正常响应,并且不是用户主动关闭就发起重连
        this.reconnect()
      }
    }, this.options.heartTimeout)
  }

  private reconnect() {
    if(!this.options.autoReconnect || !this.reconnecting) return
    if(this.reconnectId) clearInterval(this.reconnectId)
    const task = () => {
      this.connect()
    }
    task()
    this.reconnecting = true
    this.reconnectId = setInterval(task, this.options.reconnectTimeout)
  }

  private onOpen() {
    if(this.reconnectId) clearInterval(this.reconnectId)
    const isReconnect = this.reconnecting // 记录一下是否是重连
    this.websocketState = true
    this.reconnecting = false
    this.closeByActive = false
    this.bus.emit('open')
    if(this.firstParams) {
      // 如果 connect 时有参数,连接上时发送一个请求
      this.send(this.firstParams)
    }
    this.startHeart()
    if(isReconnect) {
      // 如果是重连,需要通知重连成功
      this.bus.emit('reconnect')
    }
  }

  private onMessage(e: any) {
    const message = e.data
    if(message) {
      this.websocketState = true
    }
    if(message !== this.options.PONG && message !== 'established') {
      // 接收到消息
      this.bus.emit('message', message)
    }
  }

  private onError(){
    this.websocketState = false
    this.bus.emit('error')
    if(this.closeByActive) return
    this.reconnect()
  }

  private onClose(){
    this.websocketState = false
    this.bus.emit('close')
    if(this.closeByActive) return
    this.reconnect()
  }

  // 注册事件监听
  on(type: WsEvent, handler: (event: any) => void) {
    this.bus.on(type, handler)
  }
}

socket.io

import { io } from 'socket.io-client'
import type { Socket } from 'socket.io-client'
import mitt, { type Emitter } from 'mitt'

interface IcallBack {
  [propName: string | number]: (response: any) => void
}
interface Icmd {
  [propName: string | number]: string
}

/**
 * Socket.io 客户端
 */
export class WebSocketClient {
  private socket: Socket
  private cmds: Icmd
  private callbacks: IcallBack
  private ack: number | string
  private bus: Emitter<Record<string, unknown>>

  constructor(serverUrl: string) {
    this.bus = mitt()

    //初始化socket
    this.socket = io(serverUrl, {
      // 是否自动连接
      autoConnect: false,
      transports: ['websocket'],
    })

    this.socket.connect()

    // 在连接到命名空间(包括成功的重新连接)时触发
    this.socket.on('connect', () => {
      // TODO: 连接成功,收到该事件后编辑器界面才会进行初始化
      this.bus.emit('SOCKET_CONNECT', {
        type: 'connected',
        message: serverUrl,
      })
    })

    // 连接错误时
    this.socket.on('connect_error', () => {
      setTimeout(() => {
        this.socket.connect()
      }, 1000)
    })

    // 断开时连接
    this.socket.on('disconnect', (reason: string) => {
      // 显示断开,需要手动重连
      if (
        reason === 'io server disconnect' ||
        reason === 'io client disconnect'
      ) {
        this.socket.connect()
      }
    })

    // 接收到服务端消息,message 是自定义的字符串
    this.socket.on('message', (ack: string, data: any) => {
        this.callbacks[ack]?.(data)
        this.bus.emit('SOCKET_MESSAGE', data)
      }
    )

    // 存放传入的回掉方法<确认码, 回掉方法>
    this.callbacks = {}
    // 存储操作命令<确认码, 操作命令>
    this.cmds = {}
    //确认码
    this.ack = 0
  }

  /**
   * 发送消息
   * @param cmd 操作命令
   * @param data 消息
   * @param callback 回调方法
   */
  request(
    cmd: string,
    data: any,
    callback?: (response: any) => void
  ): void {
    const ack = ++(this.ack as number)
    if (callback !== undefined) {
      this.callbacks[ack] = callback
    }
    this.cmds[ack] = cmd
    // 发送消息给服务器
    this.socket.emit(cmd, ack, data)
  }
}