From a1a9eac58818f623d6582e424bc1d4f799156dde Mon Sep 17 00:00:00 2001 From: walker Date: Fri, 28 Jul 2023 14:58:51 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84websocket=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E4=BB=A3=E7=90=86=E5=8A=9F=E8=83=BD=20=E6=B7=BB=E5=8A=A0Centri?= =?UTF-8?q?fuge=E6=B6=88=E6=81=AF=E5=AE=A2=E6=88=B7=E7=AB=AF=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=20websocket=E6=B6=88=E6=81=AF=E5=8F=AF=E6=94=AF?= =?UTF-8?q?=E6=8C=81json=E5=92=8Cprotobuf=E4=B8=A4=E7=A7=8D=E5=8D=8F?= =?UTF-8?q?=E8=AE=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package.json | 1 + src/examples/app/configs/UrlManage.ts | 3 +- src/jlgraphic/app/JlGraphicApp.ts | 14 +- src/jlgraphic/message/BasicMessageClient.ts | 40 +++ src/jlgraphic/message/CentrifugeBroker.ts | 70 ++++++ src/jlgraphic/message/MessageBroker.ts | 248 +++++++++++++++++++ src/jlgraphic/message/WsMsgBroker.ts | 256 ++++---------------- src/jlgraphic/message/index.ts | 2 +- src/layouts/DrawLayout.vue | 40 ++- yarn.lock | 94 +++++++ 10 files changed, 523 insertions(+), 245 deletions(-) create mode 100644 src/jlgraphic/message/BasicMessageClient.ts create mode 100644 src/jlgraphic/message/CentrifugeBroker.ts create mode 100644 src/jlgraphic/message/MessageBroker.ts diff --git a/package.json b/package.json index 560d58c..2fca73d 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "@stomp/stompjs": "^7.0.0", "alova": "^2.7.1", "axios": "^1.4.0", + "centrifuge": "^4.0.1", "google-protobuf": "^3.21.2", "js-base64": "^3.7.5", "pinia": "^2.0.11", diff --git a/src/examples/app/configs/UrlManage.ts b/src/examples/app/configs/UrlManage.ts index 7f7a422..b238f92 100644 --- a/src/examples/app/configs/UrlManage.ts +++ b/src/examples/app/configs/UrlManage.ts @@ -9,5 +9,6 @@ export function getHttpBase() { } export function getWebsocketUrl() { - return `ws://${getHost()}/ws-default`; + const host = '192.168.3.233:8000'; + return `ws://${host}/connection/websocket`; } diff --git a/src/jlgraphic/app/JlGraphicApp.ts b/src/jlgraphic/app/JlGraphicApp.ts index eb396e4..7e58d64 100644 --- a/src/jlgraphic/app/JlGraphicApp.ts +++ b/src/jlgraphic/app/JlGraphicApp.ts @@ -21,10 +21,10 @@ import { import { AbsorbablePosition } from '../graphic'; import { AppWsMsgBroker, - StompCli, + WsMsgCli, type AppStateSubscription, - type StompCliOption, -} from '../message/WsMsgBroker'; + type MessageCliOption, +} from '../message'; import { OperationRecord } from '../operation/JlOperation'; import { AnimationManager, @@ -498,10 +498,10 @@ export class GraphicApp extends EventEmitter { } /** - * 使能websocket Stomp通信 + * 启动websocket消息客户端 */ - enableWsStomp(options: StompCliOption) { - StompCli.new(options); + enableWsMassaging(options: MessageCliOption) { + WsMsgCli.new(options); this.wsMsgBroker = new AppWsMsgBroker(this); } @@ -513,7 +513,7 @@ export class GraphicApp extends EventEmitter { // console.log('APP订阅', sub) this.wsMsgBroker.subscribe(sub); } else { - throw new Error('请先打开StompClient, 执行app.enableWebsocket()'); + throw new Error('订阅消息需先启动消息代理, 执行app.enableWebsocket()'); } } /** diff --git a/src/jlgraphic/message/BasicMessageClient.ts b/src/jlgraphic/message/BasicMessageClient.ts new file mode 100644 index 0000000..e1a41cf --- /dev/null +++ b/src/jlgraphic/message/BasicMessageClient.ts @@ -0,0 +1,40 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import EventEmitter from 'eventemitter3'; +import { IMessageClient } from './MessageBroker'; + +export type MessageHandler = (data: any) => void; + +export interface MessageClientEvents { + connecting: [ctx: any]; + connected: [ctx: any]; + disconnected: [ctx: any]; +} + +/** + * 订阅接口 + */ +export interface ISubscription { + /** + * 取消订阅 + */ + unsubscribe(): void; +} + +export abstract class MessageClient + extends EventEmitter + implements IMessageClient +{ + /** + * 订阅消息 + * @param destination + * @param handle + */ + abstract subscribe( + destination: string, + handle: MessageHandler + ): ISubscription; + + abstract get connected(): boolean; + + abstract close(): void; +} diff --git a/src/jlgraphic/message/CentrifugeBroker.ts b/src/jlgraphic/message/CentrifugeBroker.ts new file mode 100644 index 0000000..6c6122c --- /dev/null +++ b/src/jlgraphic/message/CentrifugeBroker.ts @@ -0,0 +1,70 @@ +import { Centrifuge, State } from 'centrifuge'; +import CentrifugeProtobuf from 'centrifuge/build/protobuf'; +import { MessageCliOption } from './MessageBroker'; +import { + ISubscription, + MessageClient, + MessageHandler, +} from './BasicMessageClient'; + +export class CentrifugeMessagingClient extends MessageClient { + options: MessageCliOption; + cli: Centrifuge; + constructor(options: MessageCliOption) { + super(); + this.options = options; + if (this.options.protocol === 'json') { + this.cli = new Centrifuge(options.wsUrl, { + token: options.token, + protocol: options.protocol, + }); + } else { + this.cli = new CentrifugeProtobuf(options.wsUrl, { + token: options.token, + protocol: options.protocol, + }); + } + + this.cli + .on('connecting', (ctx) => { + console.debug(`centrifuge连接中: ${ctx}, ${ctx.reason}`); + this.emit('connecting', ctx); + }) + .on('connected', (ctx) => { + console.debug(`连接成功: ${ctx.transport}`); + this.emit('connected', ctx); + }) + .on('disconnected', (ctx) => { + console.log(`断开连接: ${ctx.code}, ${ctx.reason}`); + this.emit('disconnected', ctx); + }) + .on('error', (ctx) => { + console.error('centrifuge错误', ctx); + }) + .connect(); + } + + get connected(): boolean { + return this.cli.state === State.Connected; + } + + subscribe(destination: string, handle: MessageHandler): ISubscription { + const sub = this.cli.newSubscription(destination); + sub + .on('publication', (ctx) => { + if (this.options.protocol === 'json') { + console.log('收到centrifuge消息:', ctx.data); + } + handle(ctx.data); + }) + .on('subscribed', (ctx) => { + console.log('订阅centrifuge服务消息成功', destination, ctx); + }) + .subscribe(); + return sub; + } + + close(): void { + this.cli.disconnect(); + } +} diff --git a/src/jlgraphic/message/MessageBroker.ts b/src/jlgraphic/message/MessageBroker.ts new file mode 100644 index 0000000..0cc405d --- /dev/null +++ b/src/jlgraphic/message/MessageBroker.ts @@ -0,0 +1,248 @@ +import EventEmitter from 'eventemitter3'; +import { CentrifugeMessagingClient } from './CentrifugeBroker'; +import { StompMessagingClient } from './WsMsgBroker'; +import { GraphicState } from '../core'; +import { GraphicApp } from '../app'; +import { + ISubscription, + MessageClientEvents, + MessageHandler, +} from './BasicMessageClient'; + +export enum ClientEngine { + Stomp, + Centrifugo, +} + +export interface MessageCliOption { + /** + * 客户端引擎 + */ + engine?: ClientEngine; + /** + * 消息协议,默认protobuf + */ + protocol?: 'json' | 'protobuf'; + /** + * websocket url地址 + */ + wsUrl: string; + /** + * 认证token + */ + token?: string; + /** + * 认证失败处理 + * @returns + */ + onAuthenticationFailed?: () => void; + /** + * 连接成功处理 + * @param ctx + * @returns + */ + onConnected?: (ctx: unknown) => void; + /** + * 端口连接处理 + */ + onDisconnected?: (ctx: unknown) => void; + reconnectDelay?: number; // 重连延时,默认3秒,设置为0不重连. + heartbeatIncoming?: number; // 服务端过来的心跳间隔,默认30秒 + heartbeatOutgoing?: number; // 到服务端的心跳间隔,默认30秒 +} + +const DefaultStompOption: MessageCliOption = { + engine: ClientEngine.Stomp, + protocol: 'protobuf', + wsUrl: '', + token: '', + reconnectDelay: 3000, + heartbeatIncoming: 30000, + heartbeatOutgoing: 30000, +}; + +export interface IMessageClient extends EventEmitter { + /** + * 订阅消息 + * @param destination + * @param handle + */ + subscribe(destination: string, handle: MessageHandler): ISubscription; + + get connected(): boolean; + + close(): void; +} + +export class WsMsgCli { + private static client: IMessageClient; + private static options: MessageCliOption; + private static appMsgBroker: AppWsMsgBroker[] = []; + static new(options: MessageCliOption) { + if (WsMsgCli.client) { + // 已经创建 + return; + } + WsMsgCli.options = Object.assign({}, DefaultStompOption, options); + if (WsMsgCli.options.engine == ClientEngine.Centrifugo) { + WsMsgCli.client = new CentrifugeMessagingClient(WsMsgCli.options); + } else { + WsMsgCli.client = new StompMessagingClient(WsMsgCli.options); + // WsMsgCli.client.onStompError = (frame: Frame) => { + // const errMsg = frame.headers['message']; + // if (errMsg === '401') { + // console.warn('认证失败,断开WebSocket连接'); + // StompCli.close(); + // if (StompCli.options.onAuthenticationFailed) { + // StompCli.options.onAuthenticationFailed(); + // } + // } else { + // console.error('收到Stomp错误消息', frame); + // } + // }; + } + const cli = WsMsgCli.client; + cli.on('connected', () => { + WsMsgCli.emitConnectStateChangeEvent(true); + }); + cli.on('disconnected', () => { + WsMsgCli.emitConnectStateChangeEvent(false); + }); + } + + static emitConnectStateChangeEvent(connected: boolean) { + WsMsgCli.appMsgBroker.forEach((broker) => { + broker.app.emit('websocket-connect-state-change', connected); + }); + } + + static isConnected(): boolean { + return WsMsgCli.client && WsMsgCli.client.connected; + } + + static trySubscribe( + destination: string, + handler: MessageHandler + ): ISubscription { + return WsMsgCli.client.subscribe(destination, handler); + // if (WsMsgCli.isConnected()) { + // } + // return undefined; + } + + static registerAppMsgBroker(broker: AppWsMsgBroker) { + WsMsgCli.appMsgBroker.push(broker); + } + + static removeAppMsgBroker(broker: AppWsMsgBroker) { + const index = WsMsgCli.appMsgBroker.findIndex((mb) => mb == broker); + if (index >= 0) { + WsMsgCli.appMsgBroker.splice(index, 1); + } + } + + static hasAppMsgBroker(): boolean { + return WsMsgCli.appMsgBroker.length > 0; + } + + /** + * 关闭websocket连接 + */ + static close() { + if (WsMsgCli.client) { + WsMsgCli.client.close(); + } + } +} + +// 状态订阅消息转换器 +export type GraphicStateMessageConvert = ( + message: Uint8Array +) => GraphicState[]; + +// 订阅消息处理器 +export type SubscriptionMessageHandle = (message: Uint8Array) => void; + +// 图形app状态订阅 +export interface AppStateSubscription { + /** + * 订阅路径 + */ + destination: string; + /** + * 图形状态消息转换 + */ + messageConverter?: GraphicStateMessageConvert; + /** + * 订阅消息处理 + */ + messageHandle?: SubscriptionMessageHandle; + /** + * 订阅成功对象,用于取消订阅 + * 非客户端使用 + */ + subscription?: ISubscription; +} + +/** + * 图形APP的websocket消息代理 + */ +export class AppWsMsgBroker { + app: GraphicApp; + subscriptions: Map = new Map< + string, + AppStateSubscription + >(); + + constructor(app: GraphicApp) { + this.app = app; + WsMsgCli.registerAppMsgBroker(this); + } + + subscribe(sub: AppStateSubscription) { + this.unsbuscribe(sub.destination); // 先尝试取消之前订阅 + sub.subscription = WsMsgCli.trySubscribe(sub.destination, (data) => { + if (sub.messageConverter) { + const graphicStates = sub.messageConverter(data); + this.app.handleGraphicStates(graphicStates); + } else if (sub.messageHandle) { + sub.messageHandle(data); + } else { + console.error( + `订阅destination:${sub.destination}没有消息处理器或图形状态消息转换器` + ); + } + }); + this.subscriptions.set(sub.destination, sub); + } + + resubscribe() { + this.subscriptions.forEach((record) => { + this.subscribe(record); + }); + } + + unsbuscribe(destination: string) { + const oldSub = this.subscriptions.get(destination); + if (oldSub) { + if (oldSub.subscription && WsMsgCli.isConnected()) { + oldSub.subscription.unsubscribe(); + } + oldSub.subscription = undefined; + } + } + + unsbuscribeAll() { + this.subscriptions.forEach((record) => { + this.unsbuscribe(record.destination); + }); + } + + /** + * 取消所有订阅,从通用Stomp客户端移除此消息代理 + */ + close() { + WsMsgCli.removeAppMsgBroker(this); + this.unsbuscribeAll(); + } +} diff --git a/src/jlgraphic/message/WsMsgBroker.ts b/src/jlgraphic/message/WsMsgBroker.ts index 4e79cc2..1849b54 100644 --- a/src/jlgraphic/message/WsMsgBroker.ts +++ b/src/jlgraphic/message/WsMsgBroker.ts @@ -1,246 +1,74 @@ +import { Client as StompClient, type Frame } from '@stomp/stompjs'; +import { MessageCliOption } from './MessageBroker'; import { - Client as StompClient, - StompSubscription, - type Frame, - type Message, - type messageCallbackType, -} from '@stomp/stompjs'; -import type { GraphicApp } from '../app/JlGraphicApp'; -import { GraphicState } from '../core/JlGraphic'; + ISubscription, + MessageClient, + MessageHandler, +} from './BasicMessageClient'; -export interface StompCliOption { - /** - * websocket url地址 - */ - wsUrl: string; - /** - * 认证token - */ - token?: string; - /** - * 认证失败处理 - * @returns - */ - onAuthenticationFailed?: () => void; - reconnectDelay?: number; // 重连延时,默认3秒,设置为0不重连. - heartbeatIncoming?: number; // 服务端过来的心跳间隔,默认30秒 - heartbeatOutgoing?: number; // 到服务端的心跳间隔,默认30秒 -} - -const DefaultStompOption: StompCliOption = { - wsUrl: '', - token: '', - reconnectDelay: 3000, - heartbeatIncoming: 30000, - heartbeatOutgoing: 30000, -}; - -export class StompCli { - private static client: StompClient; - private static options: StompCliOption; - private static appMsgBroker: AppWsMsgBroker[] = []; - /** - * key-订阅路径 - */ - subscriptions: Map = new Map< - string, - AppStateSubscription - >(); - private static connected = false; - static new(options: StompCliOption) { - if (StompCli.client) { - // 已经创建 - return; - } - StompCli.options = Object.assign({}, DefaultStompOption, options); - StompCli.client = new StompClient({ - brokerURL: StompCli.options.wsUrl, +export class StompMessagingClient extends MessageClient { + options: MessageCliOption; + cli: StompClient; + constructor(options: MessageCliOption) { + super(); + this.options = options; + this.cli = new StompClient({ + brokerURL: options.wsUrl, connectHeaders: { - Authorization: StompCli.options.token ? StompCli.options.token : '', + Authorization: options.token ? options.token : '', }, - reconnectDelay: StompCli.options.reconnectDelay, - heartbeatIncoming: StompCli.options.heartbeatIncoming, - heartbeatOutgoing: StompCli.options.heartbeatOutgoing, + reconnectDelay: options.reconnectDelay, + heartbeatIncoming: options.heartbeatIncoming, + heartbeatOutgoing: options.heartbeatOutgoing, }); - StompCli.client.onConnect = () => { - // console.log('websocket连接(重连),重新订阅', StompCli.appMsgBroker.length) - StompCli.emitConnectStateChangeEvent(true); - StompCli.appMsgBroker.forEach((broker) => { - broker.resubscribe(); - }); + this.cli.onConnect = () => { + this.emit('connected', ''); }; - - StompCli.client.onStompError = (frame: Frame) => { + this.cli.onStompError = (frame: Frame) => { const errMsg = frame.headers['message']; if (errMsg === '401') { console.warn('认证失败,断开WebSocket连接'); - StompCli.close(); - if (StompCli.options.onAuthenticationFailed) { - StompCli.options.onAuthenticationFailed(); - } } else { console.error('收到Stomp错误消息', frame); } }; - StompCli.client.onDisconnect = (frame: Frame) => { + this.cli.onDisconnect = (frame: Frame) => { console.log('Stomp 断开连接', frame); - StompCli.emitConnectStateChangeEvent(false); - }; - StompCli.client.onWebSocketClose = (evt: CloseEvent) => { - console.log('websocket 关闭', evt); - StompCli.emitConnectStateChangeEvent(false); + this.emit('disconnected', frame); }; // websocket错误处理 - StompCli.client.onWebSocketError = (err: Event) => { - console.log('websocket错误', err); + this.cli.onWebSocketError = (err: Event) => { + console.error('websocket错误', err); }; - StompCli.client.activate(); + this.cli.activate(); } - static emitConnectStateChangeEvent(connected: boolean) { - StompCli.connected = connected; - StompCli.appMsgBroker.forEach((broker) => { - broker.app.emit('websocket-connect-state-change', connected); - }); + get connected(): boolean { + return this.cli.connected; } - static isConnected(): boolean { - return StompCli.client && StompCli.client.connected; - } - - static trySubscribe( - destination: string, - handler: messageCallbackType - ): StompSubscription | undefined { - if (StompCli.isConnected()) { - return StompCli.client.subscribe(destination, handler, { - id: destination, - }); - } - return undefined; - } - - static registerAppMsgBroker(broker: AppWsMsgBroker) { - StompCli.appMsgBroker.push(broker); - } - - static removeAppMsgBroker(broker: AppWsMsgBroker) { - const index = StompCli.appMsgBroker.findIndex((mb) => mb == broker); - if (index >= 0) { - StompCli.appMsgBroker.splice(index, 1); - } - } - - static hasAppMsgBroker(): boolean { - return StompCli.appMsgBroker.length > 0; - } - - /** - * 关闭websocket连接 - */ - static close() { - StompCli.connected = false; - if (StompCli.client) { - StompCli.client.deactivate(); - } - } -} - -// 状态订阅消息转换器 -export type GraphicStateMessageConvert = ( - message: Uint8Array -) => GraphicState[]; - -// 订阅消息处理器 -export type SubscriptionMessageHandle = (message: Uint8Array) => void; - -// 图形app状态订阅 -export interface AppStateSubscription { - /** - * 订阅路径 - */ - destination: string; - /** - * 图形状态消息转换 - */ - messageConverter?: GraphicStateMessageConvert; - /** - * 订阅消息处理 - */ - messageHandle?: SubscriptionMessageHandle; - /** - * 订阅成功对象,用于取消订阅 - * 非客户端使用 - */ - subscription?: StompSubscription; -} - -/** - * 图形APP的websocket消息代理 - */ -export class AppWsMsgBroker { - app: GraphicApp; - subscriptions: Map = new Map< - string, - AppStateSubscription - >(); - - constructor(app: GraphicApp) { - this.app = app; - StompCli.registerAppMsgBroker(this); - } - - subscribe(sub: AppStateSubscription) { - this.unsbuscribe(sub.destination); // 先尝试取消之前订阅 - sub.subscription = StompCli.trySubscribe( - sub.destination, - (message: Message) => { - if (sub.messageConverter) { - const graphicStates = sub.messageConverter(message.binaryBody); - this.app.handleGraphicStates(graphicStates); - } else if (sub.messageHandle) { - sub.messageHandle(message.binaryBody); + subscribe(destination: string, handle: MessageHandler): ISubscription { + const sub = this.cli.subscribe( + destination, + (frame) => { + if (this.options.protocol === 'json') { + const data = JSON.parse(frame.body); + handle(data); } else { - console.error( - `订阅destination:${sub.destination}没有消息处理器或图形状态消息转换器` - ); + handle(frame.binaryBody); } + }, + { + id: destination, } ); - // console.log('代理订阅结果', sub.subscription) - this.subscriptions.set(sub.destination, sub); + return sub; } - resubscribe() { - this.subscriptions.forEach((record) => { - this.subscribe(record); - }); - } - - unsbuscribe(destination: string) { - const oldSub = this.subscriptions.get(destination); - if (oldSub) { - if (oldSub.subscription && StompCli.isConnected()) { - oldSub.subscription.unsubscribe(); - } - oldSub.subscription = undefined; - } - } - - unsbuscribeAll() { - this.subscriptions.forEach((record) => { - this.unsbuscribe(record.destination); - }); - } - - /** - * 取消所有订阅,从通用Stomp客户端移除此消息代理 - */ - close() { - StompCli.removeAppMsgBroker(this); - this.unsbuscribeAll(); + close(): void { + this.cli.deactivate(); } } diff --git a/src/jlgraphic/message/index.ts b/src/jlgraphic/message/index.ts index fddbf3f..349ebdd 100644 --- a/src/jlgraphic/message/index.ts +++ b/src/jlgraphic/message/index.ts @@ -1 +1 @@ -export * from './WsMsgBroker'; +export * from './MessageBroker'; diff --git a/src/layouts/DrawLayout.vue b/src/layouts/DrawLayout.vue index d1792b5..a73b152 100644 --- a/src/layouts/DrawLayout.vue +++ b/src/layouts/DrawLayout.vue @@ -120,13 +120,14 @@