From ef8a0ae988ecde0f16969a741a4a2911c7dbf4ff Mon Sep 17 00:00:00 2001 From: walker Date: Wed, 2 Aug 2023 15:38:07 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E6=B6=88=E6=81=AF=E6=8F=92?= =?UTF-8?q?=E4=BB=B6=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/jlgraphic/app/JlGraphicApp.ts | 15 +- src/jlgraphic/message/BasicMessageClient.ts | 141 ++++++++++++++++-- src/jlgraphic/message/CentrifugeBroker.ts | 43 +++--- src/jlgraphic/message/MessageBroker.ts | 152 ++++++++------------ src/jlgraphic/message/WsMsgBroker.ts | 21 ++- 5 files changed, 225 insertions(+), 147 deletions(-) diff --git a/src/jlgraphic/app/JlGraphicApp.ts b/src/jlgraphic/app/JlGraphicApp.ts index 7e58d64..7dd0a46 100644 --- a/src/jlgraphic/app/JlGraphicApp.ts +++ b/src/jlgraphic/app/JlGraphicApp.ts @@ -533,24 +533,17 @@ export class GraphicApp extends EventEmitter { */ handleGraphicStates(graphicStates: GraphicState[]) { graphicStates.forEach((state) => { - const list = this.queryStore.queryByIdOrCodeAndType( + const g = this.queryStore.queryByCodeAndType( state.code, state.graphicType ); - if (list.length == 0) { + if (!g) { const template = this.getGraphicTemplatesByType(state.graphicType); const g = template.new(); g.loadState(state); this.addGraphics(g); - } else { - // 调整逻辑:所有图形对象状态数据更新完后再统一重绘 - list - .filter((g) => { - return g.updateStates(state); - }) - .forEach((g) => { - g.repaint(); - }); + } else if (g.updateStates(state)) { + g.repaint(); } }); } diff --git a/src/jlgraphic/message/BasicMessageClient.ts b/src/jlgraphic/message/BasicMessageClient.ts index e1a41cf..f63d55a 100644 --- a/src/jlgraphic/message/BasicMessageClient.ts +++ b/src/jlgraphic/message/BasicMessageClient.ts @@ -1,29 +1,37 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import EventEmitter from 'eventemitter3'; -import { IMessageClient } from './MessageBroker'; - -export type MessageHandler = (data: any) => void; +import { GraphicApp } from '../app'; +import { CompleteMessageCliOption, IMessageClient } from './MessageBroker'; export interface MessageClientEvents { - connecting: [ctx: any]; connected: [ctx: any]; disconnected: [ctx: any]; } -/** - * 订阅接口 - */ -export interface ISubscription { +export type HandleMessage = (data: any) => void; + +export interface IMessageHandler { /** - * 取消订阅 + * id */ - unsubscribe(): void; + get App(): GraphicApp; + /** + * 处理消息数据 + * @param data + */ + handle(data: any): void; } export abstract class MessageClient extends EventEmitter implements IMessageClient { + options: CompleteMessageCliOption; + subClients: SubscriptionClient[] = []; // 订阅客户端 + constructor(options: CompleteMessageCliOption) { + super(); + this.options = options; + } /** * 订阅消息 * @param destination @@ -31,10 +39,119 @@ export abstract class MessageClient */ abstract subscribe( destination: string, - handle: MessageHandler - ): ISubscription; + handle: HandleMessage + ): IUnsubscriptor; + + unsubscribe(destination: string): void { + this.unsubscribe0(destination); + const idx = this.subClients.findIndex( + (cli) => cli.destination === destination + ); + if (idx >= 0) { + this.subClients.splice(idx, 1); + } + } + + abstract unsubscribe0(destination: string): void; + + getOrNewSubClient(destination: string): SubscriptionClient { + let cli = this.subClients.find((cli) => cli.destination === destination); + if (!cli) { + // 不存在,新建 + cli = new SubscriptionClient(this, destination, this.options.protocol); + this.subClients.push(cli); + } + return cli; + } + + addSubscription(destination: string, handler: IMessageHandler): void { + const cli = this.getOrNewSubClient(destination); + cli.addHandler(handler); + } + removeSubscription(destination: string, handle: IMessageHandler): void { + this.getOrNewSubClient(destination).removeHandler(handle); + } abstract get connected(): boolean; abstract close(): void; } + +/** + * 订阅取消接口 + */ +export interface IUnsubscriptor { + /** + * 取消订阅 + */ + unsubscribe(): void; +} + +export class SubscriptionClient { + mc: MessageClient; + destination: string; + protocol: 'json' | 'protobuf'; + handlers: IMessageHandler[] = []; + unsubscriptor?: IUnsubscriptor; + constructor( + mc: MessageClient, + destination: string, + protocal: 'json' | 'protobuf' + ) { + this.mc = mc; + this.destination = destination; + this.protocol = protocal; + this.mc.on('disconnected', this.onDisconnect, this); + this.mc.on('connected', this.trySubscribe, this); + } + + addHandler(handler: IMessageHandler) { + if (this.handlers.filter((h) => h.App === handler.App).length == 0) { + this.handlers.push(handler); + } + if (!this.unsubscriptor) { + this.trySubscribe(); + } + } + + removeHandler(handler: IMessageHandler) { + const idx = this.handlers.findIndex((h) => h.App === handler.App); + if (idx >= 0) { + this.handlers.splice(idx, 1); + } + if (this.handlers.length == 0) { + console.log(`订阅${this.destination}没有消息监听处理,移除订阅`); + this.unsubscribe(); + } + } + + trySubscribe(): void { + if (this.mc.connected) { + this.unsubscriptor = this.mc.subscribe( + this.destination, + this.handleMessage + ); + } + } + + unsubscribe(): void { + this.mc.unsubscribe(this.destination); + } + + handleMessage(data: any) { + if (this.protocol === 'json') { + console.debug('收到消息:', data); + } + this.handlers.forEach((handler) => { + try { + handler.handle(data); + } catch (error) { + console.error('图形应用状态消息处理异常', error); + } + }); + } + + onDisconnect() { + this.unsubscriptor = undefined; + } +} diff --git a/src/jlgraphic/message/CentrifugeBroker.ts b/src/jlgraphic/message/CentrifugeBroker.ts index 3e9b1a7..ece12f5 100644 --- a/src/jlgraphic/message/CentrifugeBroker.ts +++ b/src/jlgraphic/message/CentrifugeBroker.ts @@ -1,17 +1,17 @@ -import { State } from 'centrifuge'; +import { State, Subscription } from 'centrifuge'; import Centrifuge from 'centrifuge/build/protobuf'; -import { MessageCliOption } from './MessageBroker'; import { - ISubscription, + HandleMessage, + IUnsubscriptor, MessageClient, - MessageHandler, } from './BasicMessageClient'; +import { CompleteMessageCliOption } from './MessageBroker'; export class CentrifugeMessagingClient extends MessageClient { - options: MessageCliOption; cli: Centrifuge; - constructor(options: MessageCliOption) { - super(); + + constructor(options: CompleteMessageCliOption) { + super(options); this.options = options; if (this.options.protocol === 'json') { this.cli = new Centrifuge(options.wsUrl, { @@ -28,7 +28,6 @@ export class CentrifugeMessagingClient extends MessageClient { this.cli .on('connecting', (ctx) => { console.debug(`centrifuge连接中: ${ctx}, ${ctx.reason}`); - this.emit('connecting', ctx); }) .on('connected', (ctx) => { console.debug(`连接成功: ${ctx.transport}`); @@ -48,31 +47,23 @@ export class CentrifugeMessagingClient extends MessageClient { return this.cli.state === State.Connected; } - subscribe(destination: string, handle: MessageHandler): ISubscription { + subscribe(destination: string, handle: HandleMessage): IUnsubscriptor { let sub = this.cli.getSubscription(destination); if (!sub) { sub = this.cli.newSubscription(destination); - sub - .on('publication', (ctx) => { - if (this.options.protocol === 'json') { - console.log('收到centrifuge消息:', ctx.data); - } - try { - handle(ctx.data); - } catch (error) { - console.log('websocket状态消息处理异常', error); - } - }) - .on('subscribed', (ctx) => { - console.log('订阅centrifuge服务消息成功', destination, ctx); - }) - .subscribe(); - } else { - sub.subscribe(); + sub.on('publication', (ctx) => { + handle(ctx.data); + }); } + sub.subscribe(); return sub; } + unsubscribe0(destination: string): void { + const subClient = this.getOrNewSubClient(destination); + this.cli.removeSubscription(subClient.unsubscriptor as Subscription); + } + close(): void { this.cli.disconnect(); } diff --git a/src/jlgraphic/message/MessageBroker.ts b/src/jlgraphic/message/MessageBroker.ts index 2491111..1e4a3d6 100644 --- a/src/jlgraphic/message/MessageBroker.ts +++ b/src/jlgraphic/message/MessageBroker.ts @@ -1,13 +1,13 @@ import EventEmitter from 'eventemitter3'; +import { GraphicApp } from '../app'; +import { GraphicState } from '../core'; +import { + IMessageHandler, + IUnsubscriptor, + MessageClientEvents, +} from './BasicMessageClient'; import { CentrifugeMessagingClient } from './CentrifugeBroker'; import { StompMessagingClient } from './WsMsgBroker'; -import { GraphicState } from '../core'; -import { GraphicApp } from '../app'; -import { - ISubscription, - MessageClientEvents, - MessageHandler, -} from './BasicMessageClient'; export enum ClientEngine { Stomp, @@ -31,46 +31,32 @@ export interface MessageCliOption { * 认证token */ token?: string; - // /** - // * 认证失败处理 - // * @returns - // */ - // onAuthenticationFailed?: () => void; - // /** - // * 连接成功处理 - // * @param ctx - // * @returns - // */ - // onConnected?: (ctx: unknown) => void; - // /** - // * 端口连接处理 - // */ - // onDisconnected?: (ctx: unknown) => void; - // // 重连延时,默认3秒,设置为0不重连. - // reconnectDelay?: number; - // // 服务端过来的心跳间隔,默认30秒 - // heartbeatIncoming?: number; - // // 到服务端的心跳间隔,默认30秒 - // heartbeatOutgoing?: number; } -const DefaultStompOption: MessageCliOption = { +export interface CompleteMessageCliOption extends MessageCliOption { + protocol: 'json' | 'protobuf'; +} + +const DefaultStompOption: CompleteMessageCliOption = { engine: ClientEngine.Stomp, protocol: 'protobuf', wsUrl: '', token: '', - // reconnectDelay: 3000, - // heartbeatIncoming: 30000, - // heartbeatOutgoing: 30000, }; export interface IMessageClient extends EventEmitter { /** - * 订阅消息 + * 添加订阅 * @param destination - * @param handle + * @param handler */ - subscribe(destination: string, handle: MessageHandler): ISubscription; + addSubscription(destination: string, handler: IMessageHandler): void; + /** + * 移除订阅 + * @param destination + * @param handler + */ + removeSubscription(destination: string, handler: IMessageHandler): void; /** * 是否已经连接 @@ -85,7 +71,7 @@ export interface IMessageClient extends EventEmitter { export class WsMsgCli { private static client: IMessageClient; - private static options: MessageCliOption; + private static options: CompleteMessageCliOption; private static appMsgBroker: AppWsMsgBroker[] = []; static new(options: MessageCliOption) { if (WsMsgCli.client) { @@ -97,25 +83,10 @@ export class WsMsgCli { WsMsgCli.client = new CentrifugeMessagingClient(WsMsgCli.options); } else { WsMsgCli.client = new StompMessagingClient(WsMsgCli.options); - // WsMsgCli.client.onStompError = (frame: Frame) => { - // const errMsg = frame.headers['message']; - // if (errMsg === '401') { - // console.warn('认证失败,断开WebSocket连接'); - // StompCli.close(); - // if (StompCli.options.onAuthenticationFailed) { - // StompCli.options.onAuthenticationFailed(); - // } - // } else { - // console.error('收到Stomp错误消息', frame); - // } - // }; } const cli = WsMsgCli.client; cli.on('connected', () => { WsMsgCli.emitConnectStateChangeEvent(true); - WsMsgCli.appMsgBroker.forEach((broker) => { - broker.resubscribe(); - }); }); cli.on('disconnected', () => { WsMsgCli.emitConnectStateChangeEvent(false); @@ -132,14 +103,12 @@ export class WsMsgCli { return WsMsgCli.client && WsMsgCli.client.connected; } - static trySubscribe( - destination: string, - handler: MessageHandler - ): ISubscription | undefined { - if (WsMsgCli.isConnected()) { - return WsMsgCli.client.subscribe(destination, handler); - } - return undefined; + static registerSubscription(destination: string, handler: IMessageHandler) { + WsMsgCli.client.addSubscription(destination, handler); + } + + static unregisterSubscription(destination: string, handler: IMessageHandler) { + WsMsgCli.client.removeSubscription(destination, handler); } static registerAppMsgBroker(broker: AppWsMsgBroker) { @@ -193,7 +162,31 @@ export interface AppStateSubscription { * 订阅成功对象,用于取消订阅 * 非客户端使用 */ - subscription?: ISubscription; + subscription?: IUnsubscriptor; +} + +class AppMessageHandler implements IMessageHandler { + app: GraphicApp; + sub: AppStateSubscription; + constructor(app: GraphicApp, subOptions: AppStateSubscription) { + this.app = app; + if (!subOptions.messageConverter && !subOptions.messageHandle) { + throw new Error(`没有消息处理器或图形状态消息转换器: ${subOptions}`); + } + this.sub = subOptions; + } + get App(): GraphicApp { + return this.app; + } + handle(data: any): void { + const sub = this.sub; + if (sub.messageConverter) { + const graphicStates = sub.messageConverter(data); + this.app.handleGraphicStates(graphicStates); + } else if (sub.messageHandle) { + sub.messageHandle(data); + } + } } /** @@ -201,9 +194,9 @@ export interface AppStateSubscription { */ export class AppWsMsgBroker { app: GraphicApp; - subscriptions: Map = new Map< + subscriptions: Map = new Map< string, - AppStateSubscription + AppMessageHandler >(); constructor(app: GraphicApp) { @@ -212,44 +205,21 @@ export class AppWsMsgBroker { } subscribe(sub: AppStateSubscription) { - this.unsbuscribe(sub.destination); // 先尝试取消之前订阅 - sub.subscription = WsMsgCli.trySubscribe(sub.destination, (data) => { - if (sub.messageConverter) { - const graphicStates = sub.messageConverter(data); - this.app.handleGraphicStates(graphicStates); - } else if (sub.messageHandle) { - sub.messageHandle(data); - } else { - console.error( - `订阅destination:${sub.destination}没有消息处理器或图形状态消息转换器` - ); - } - }); - this.subscriptions.set(sub.destination, sub); - } - - /** - * 重连后重新订阅 - */ - resubscribe() { - this.subscriptions.forEach((record) => { - this.subscribe(record); - }); + const handler = new AppMessageHandler(this.app, sub); + WsMsgCli.registerSubscription(sub.destination, handler); + this.subscriptions.set(sub.destination, handler); } unsbuscribe(destination: string) { const oldSub = this.subscriptions.get(destination); if (oldSub) { - if (oldSub.subscription && WsMsgCli.isConnected()) { - oldSub.subscription.unsubscribe(); - } - oldSub.subscription = undefined; + WsMsgCli.unregisterSubscription(destination, oldSub); } } unsbuscribeAll() { - this.subscriptions.forEach((record) => { - this.unsbuscribe(record.destination); + this.subscriptions.forEach((record, destination) => { + this.unsbuscribe(destination); }); } diff --git a/src/jlgraphic/message/WsMsgBroker.ts b/src/jlgraphic/message/WsMsgBroker.ts index c1c252c..ebc7c65 100644 --- a/src/jlgraphic/message/WsMsgBroker.ts +++ b/src/jlgraphic/message/WsMsgBroker.ts @@ -1,20 +1,20 @@ import { Client as StompClient, type Frame } from '@stomp/stompjs'; -import { MessageCliOption } from './MessageBroker'; import { - ISubscription, + HandleMessage, + IUnsubscriptor, MessageClient, - MessageHandler, } from './BasicMessageClient'; +import { CompleteMessageCliOption } from './MessageBroker'; const ReconnectDelay = 3000; const HeartbeatIncoming = 30000; const HeartbeatOutgoing = 30000; export class StompMessagingClient extends MessageClient { - options: MessageCliOption; cli: StompClient; - constructor(options: MessageCliOption) { - super(); + + constructor(options: CompleteMessageCliOption) { + super(options); this.options = options; this.cli = new StompClient({ brokerURL: options.wsUrl, @@ -27,6 +27,9 @@ export class StompMessagingClient extends MessageClient { }); this.cli.onConnect = () => { + // this.subClients.forEach((cli) => { + // this.subscribe(cli.destination, cli.handleMessage); + // }); this.emit('connected', ''); }; this.cli.onStompError = (frame: Frame) => { @@ -55,7 +58,7 @@ export class StompMessagingClient extends MessageClient { return this.cli.connected; } - subscribe(destination: string, handle: MessageHandler): ISubscription { + subscribe(destination: string, handle: HandleMessage): IUnsubscriptor { const sub = this.cli.subscribe( destination, (frame) => { @@ -73,6 +76,10 @@ export class StompMessagingClient extends MessageClient { return sub; } + unsubscribe0(destination: string): void { + this.cli.unsubscribe(destination); + } + close(): void { this.cli.deactivate(); }