调整消息插件结构

This commit is contained in:
walker 2023-08-02 15:38:07 +08:00
parent 50f465b24f
commit ef8a0ae988
5 changed files with 225 additions and 147 deletions

View File

@ -533,24 +533,17 @@ export class GraphicApp extends EventEmitter<GraphicAppEvents> {
*/ */
handleGraphicStates(graphicStates: GraphicState[]) { handleGraphicStates(graphicStates: GraphicState[]) {
graphicStates.forEach((state) => { graphicStates.forEach((state) => {
const list = this.queryStore.queryByIdOrCodeAndType( const g = this.queryStore.queryByCodeAndType(
state.code, state.code,
state.graphicType state.graphicType
); );
if (list.length == 0) { if (!g) {
const template = this.getGraphicTemplatesByType(state.graphicType); const template = this.getGraphicTemplatesByType(state.graphicType);
const g = template.new(); const g = template.new();
g.loadState(state); g.loadState(state);
this.addGraphics(g); this.addGraphics(g);
} else { } else if (g.updateStates(state)) {
// 调整逻辑:所有图形对象状态数据更新完后再统一重绘
list
.filter((g) => {
return g.updateStates(state);
})
.forEach((g) => {
g.repaint(); g.repaint();
});
} }
}); });
} }

View File

@ -1,29 +1,37 @@
/* eslint-disable @typescript-eslint/no-explicit-any */ /* eslint-disable @typescript-eslint/no-explicit-any */
import EventEmitter from 'eventemitter3'; import EventEmitter from 'eventemitter3';
import { IMessageClient } from './MessageBroker'; import { GraphicApp } from '../app';
import { CompleteMessageCliOption, IMessageClient } from './MessageBroker';
export type MessageHandler = (data: any) => void;
export interface MessageClientEvents { export interface MessageClientEvents {
connecting: [ctx: any];
connected: [ctx: any]; connected: [ctx: any];
disconnected: [ctx: any]; disconnected: [ctx: any];
} }
export type HandleMessage = (data: any) => void;
export interface IMessageHandler {
/** /**
* * id
*/ */
export interface ISubscription { get App(): GraphicApp;
/** /**
* *
* @param data
*/ */
unsubscribe(): void; handle(data: any): void;
} }
export abstract class MessageClient export abstract class MessageClient
extends EventEmitter<MessageClientEvents> extends EventEmitter<MessageClientEvents>
implements IMessageClient implements IMessageClient
{ {
options: CompleteMessageCliOption;
subClients: SubscriptionClient[] = []; // 订阅客户端
constructor(options: CompleteMessageCliOption) {
super();
this.options = options;
}
/** /**
* *
* @param destination * @param destination
@ -31,10 +39,119 @@ export abstract class MessageClient
*/ */
abstract subscribe( abstract subscribe(
destination: string, destination: string,
handle: MessageHandler handle: HandleMessage
): ISubscription; ): IUnsubscriptor;
unsubscribe(destination: string): void {
this.unsubscribe0(destination);
const idx = this.subClients.findIndex(
(cli) => cli.destination === destination
);
if (idx >= 0) {
this.subClients.splice(idx, 1);
}
}
abstract unsubscribe0(destination: string): void;
getOrNewSubClient(destination: string): SubscriptionClient {
let cli = this.subClients.find((cli) => cli.destination === destination);
if (!cli) {
// 不存在,新建
cli = new SubscriptionClient(this, destination, this.options.protocol);
this.subClients.push(cli);
}
return cli;
}
addSubscription(destination: string, handler: IMessageHandler): void {
const cli = this.getOrNewSubClient(destination);
cli.addHandler(handler);
}
removeSubscription(destination: string, handle: IMessageHandler): void {
this.getOrNewSubClient(destination).removeHandler(handle);
}
abstract get connected(): boolean; abstract get connected(): boolean;
abstract close(): void; abstract close(): void;
} }
/**
*
*/
export interface IUnsubscriptor {
/**
*
*/
unsubscribe(): void;
}
export class SubscriptionClient {
mc: MessageClient;
destination: string;
protocol: 'json' | 'protobuf';
handlers: IMessageHandler[] = [];
unsubscriptor?: IUnsubscriptor;
constructor(
mc: MessageClient,
destination: string,
protocal: 'json' | 'protobuf'
) {
this.mc = mc;
this.destination = destination;
this.protocol = protocal;
this.mc.on('disconnected', this.onDisconnect, this);
this.mc.on('connected', this.trySubscribe, this);
}
addHandler(handler: IMessageHandler) {
if (this.handlers.filter((h) => h.App === handler.App).length == 0) {
this.handlers.push(handler);
}
if (!this.unsubscriptor) {
this.trySubscribe();
}
}
removeHandler(handler: IMessageHandler) {
const idx = this.handlers.findIndex((h) => h.App === handler.App);
if (idx >= 0) {
this.handlers.splice(idx, 1);
}
if (this.handlers.length == 0) {
console.log(`订阅${this.destination}没有消息监听处理,移除订阅`);
this.unsubscribe();
}
}
trySubscribe(): void {
if (this.mc.connected) {
this.unsubscriptor = this.mc.subscribe(
this.destination,
this.handleMessage
);
}
}
unsubscribe(): void {
this.mc.unsubscribe(this.destination);
}
handleMessage(data: any) {
if (this.protocol === 'json') {
console.debug('收到消息:', data);
}
this.handlers.forEach((handler) => {
try {
handler.handle(data);
} catch (error) {
console.error('图形应用状态消息处理异常', error);
}
});
}
onDisconnect() {
this.unsubscriptor = undefined;
}
}

View File

@ -1,17 +1,17 @@
import { State } from 'centrifuge'; import { State, Subscription } from 'centrifuge';
import Centrifuge from 'centrifuge/build/protobuf'; import Centrifuge from 'centrifuge/build/protobuf';
import { MessageCliOption } from './MessageBroker';
import { import {
ISubscription, HandleMessage,
IUnsubscriptor,
MessageClient, MessageClient,
MessageHandler,
} from './BasicMessageClient'; } from './BasicMessageClient';
import { CompleteMessageCliOption } from './MessageBroker';
export class CentrifugeMessagingClient extends MessageClient { export class CentrifugeMessagingClient extends MessageClient {
options: MessageCliOption;
cli: Centrifuge; cli: Centrifuge;
constructor(options: MessageCliOption) {
super(); constructor(options: CompleteMessageCliOption) {
super(options);
this.options = options; this.options = options;
if (this.options.protocol === 'json') { if (this.options.protocol === 'json') {
this.cli = new Centrifuge(options.wsUrl, { this.cli = new Centrifuge(options.wsUrl, {
@ -28,7 +28,6 @@ export class CentrifugeMessagingClient extends MessageClient {
this.cli this.cli
.on('connecting', (ctx) => { .on('connecting', (ctx) => {
console.debug(`centrifuge连接中: ${ctx}, ${ctx.reason}`); console.debug(`centrifuge连接中: ${ctx}, ${ctx.reason}`);
this.emit('connecting', ctx);
}) })
.on('connected', (ctx) => { .on('connected', (ctx) => {
console.debug(`连接成功: ${ctx.transport}`); console.debug(`连接成功: ${ctx.transport}`);
@ -48,31 +47,23 @@ export class CentrifugeMessagingClient extends MessageClient {
return this.cli.state === State.Connected; return this.cli.state === State.Connected;
} }
subscribe(destination: string, handle: MessageHandler): ISubscription { subscribe(destination: string, handle: HandleMessage): IUnsubscriptor {
let sub = this.cli.getSubscription(destination); let sub = this.cli.getSubscription(destination);
if (!sub) { if (!sub) {
sub = this.cli.newSubscription(destination); sub = this.cli.newSubscription(destination);
sub sub.on('publication', (ctx) => {
.on('publication', (ctx) => {
if (this.options.protocol === 'json') {
console.log('收到centrifuge消息:', ctx.data);
}
try {
handle(ctx.data); handle(ctx.data);
} catch (error) { });
console.log('websocket状态消息处理异常', error);
} }
})
.on('subscribed', (ctx) => {
console.log('订阅centrifuge服务消息成功', destination, ctx);
})
.subscribe();
} else {
sub.subscribe(); sub.subscribe();
}
return sub; return sub;
} }
unsubscribe0(destination: string): void {
const subClient = this.getOrNewSubClient(destination);
this.cli.removeSubscription(subClient.unsubscriptor as Subscription);
}
close(): void { close(): void {
this.cli.disconnect(); this.cli.disconnect();
} }

View File

@ -1,13 +1,13 @@
import EventEmitter from 'eventemitter3'; import EventEmitter from 'eventemitter3';
import { GraphicApp } from '../app';
import { GraphicState } from '../core';
import {
IMessageHandler,
IUnsubscriptor,
MessageClientEvents,
} from './BasicMessageClient';
import { CentrifugeMessagingClient } from './CentrifugeBroker'; import { CentrifugeMessagingClient } from './CentrifugeBroker';
import { StompMessagingClient } from './WsMsgBroker'; import { StompMessagingClient } from './WsMsgBroker';
import { GraphicState } from '../core';
import { GraphicApp } from '../app';
import {
ISubscription,
MessageClientEvents,
MessageHandler,
} from './BasicMessageClient';
export enum ClientEngine { export enum ClientEngine {
Stomp, Stomp,
@ -31,46 +31,32 @@ export interface MessageCliOption {
* token * token
*/ */
token?: string; token?: string;
// /**
// * 认证失败处理
// * @returns
// */
// onAuthenticationFailed?: () => void;
// /**
// * 连接成功处理
// * @param ctx
// * @returns
// */
// onConnected?: (ctx: unknown) => void;
// /**
// * 端口连接处理
// */
// onDisconnected?: (ctx: unknown) => void;
// // 重连延时默认3秒,设置为0不重连.
// reconnectDelay?: number;
// // 服务端过来的心跳间隔默认30秒
// heartbeatIncoming?: number;
// // 到服务端的心跳间隔默认30秒
// heartbeatOutgoing?: number;
} }
const DefaultStompOption: MessageCliOption = { export interface CompleteMessageCliOption extends MessageCliOption {
protocol: 'json' | 'protobuf';
}
const DefaultStompOption: CompleteMessageCliOption = {
engine: ClientEngine.Stomp, engine: ClientEngine.Stomp,
protocol: 'protobuf', protocol: 'protobuf',
wsUrl: '', wsUrl: '',
token: '', token: '',
// reconnectDelay: 3000,
// heartbeatIncoming: 30000,
// heartbeatOutgoing: 30000,
}; };
export interface IMessageClient extends EventEmitter<MessageClientEvents> { export interface IMessageClient extends EventEmitter<MessageClientEvents> {
/** /**
* *
* @param destination * @param destination
* @param handle * @param handler
*/ */
subscribe(destination: string, handle: MessageHandler): ISubscription; addSubscription(destination: string, handler: IMessageHandler): void;
/**
*
* @param destination
* @param handler
*/
removeSubscription(destination: string, handler: IMessageHandler): void;
/** /**
* *
@ -85,7 +71,7 @@ export interface IMessageClient extends EventEmitter<MessageClientEvents> {
export class WsMsgCli { export class WsMsgCli {
private static client: IMessageClient; private static client: IMessageClient;
private static options: MessageCliOption; private static options: CompleteMessageCliOption;
private static appMsgBroker: AppWsMsgBroker[] = []; private static appMsgBroker: AppWsMsgBroker[] = [];
static new(options: MessageCliOption) { static new(options: MessageCliOption) {
if (WsMsgCli.client) { if (WsMsgCli.client) {
@ -97,25 +83,10 @@ export class WsMsgCli {
WsMsgCli.client = new CentrifugeMessagingClient(WsMsgCli.options); WsMsgCli.client = new CentrifugeMessagingClient(WsMsgCli.options);
} else { } else {
WsMsgCli.client = new StompMessagingClient(WsMsgCli.options); WsMsgCli.client = new StompMessagingClient(WsMsgCli.options);
// WsMsgCli.client.onStompError = (frame: Frame) => {
// const errMsg = frame.headers['message'];
// if (errMsg === '401') {
// console.warn('认证失败,断开WebSocket连接');
// StompCli.close();
// if (StompCli.options.onAuthenticationFailed) {
// StompCli.options.onAuthenticationFailed();
// }
// } else {
// console.error('收到Stomp错误消息', frame);
// }
// };
} }
const cli = WsMsgCli.client; const cli = WsMsgCli.client;
cli.on('connected', () => { cli.on('connected', () => {
WsMsgCli.emitConnectStateChangeEvent(true); WsMsgCli.emitConnectStateChangeEvent(true);
WsMsgCli.appMsgBroker.forEach((broker) => {
broker.resubscribe();
});
}); });
cli.on('disconnected', () => { cli.on('disconnected', () => {
WsMsgCli.emitConnectStateChangeEvent(false); WsMsgCli.emitConnectStateChangeEvent(false);
@ -132,14 +103,12 @@ export class WsMsgCli {
return WsMsgCli.client && WsMsgCli.client.connected; return WsMsgCli.client && WsMsgCli.client.connected;
} }
static trySubscribe( static registerSubscription(destination: string, handler: IMessageHandler) {
destination: string, WsMsgCli.client.addSubscription(destination, handler);
handler: MessageHandler
): ISubscription | undefined {
if (WsMsgCli.isConnected()) {
return WsMsgCli.client.subscribe(destination, handler);
} }
return undefined;
static unregisterSubscription(destination: string, handler: IMessageHandler) {
WsMsgCli.client.removeSubscription(destination, handler);
} }
static registerAppMsgBroker(broker: AppWsMsgBroker) { static registerAppMsgBroker(broker: AppWsMsgBroker) {
@ -193,7 +162,31 @@ export interface AppStateSubscription {
* *
* 使 * 使
*/ */
subscription?: ISubscription; subscription?: IUnsubscriptor;
}
class AppMessageHandler implements IMessageHandler {
app: GraphicApp;
sub: AppStateSubscription;
constructor(app: GraphicApp, subOptions: AppStateSubscription) {
this.app = app;
if (!subOptions.messageConverter && !subOptions.messageHandle) {
throw new Error(`没有消息处理器或图形状态消息转换器: ${subOptions}`);
}
this.sub = subOptions;
}
get App(): GraphicApp {
return this.app;
}
handle(data: any): void {
const sub = this.sub;
if (sub.messageConverter) {
const graphicStates = sub.messageConverter(data);
this.app.handleGraphicStates(graphicStates);
} else if (sub.messageHandle) {
sub.messageHandle(data);
}
}
} }
/** /**
@ -201,9 +194,9 @@ export interface AppStateSubscription {
*/ */
export class AppWsMsgBroker { export class AppWsMsgBroker {
app: GraphicApp; app: GraphicApp;
subscriptions: Map<string, AppStateSubscription> = new Map< subscriptions: Map<string, AppMessageHandler> = new Map<
string, string,
AppStateSubscription AppMessageHandler
>(); >();
constructor(app: GraphicApp) { constructor(app: GraphicApp) {
@ -212,44 +205,21 @@ export class AppWsMsgBroker {
} }
subscribe(sub: AppStateSubscription) { subscribe(sub: AppStateSubscription) {
this.unsbuscribe(sub.destination); // 先尝试取消之前订阅 const handler = new AppMessageHandler(this.app, sub);
sub.subscription = WsMsgCli.trySubscribe(sub.destination, (data) => { WsMsgCli.registerSubscription(sub.destination, handler);
if (sub.messageConverter) { this.subscriptions.set(sub.destination, handler);
const graphicStates = sub.messageConverter(data);
this.app.handleGraphicStates(graphicStates);
} else if (sub.messageHandle) {
sub.messageHandle(data);
} else {
console.error(
`订阅destination:${sub.destination}没有消息处理器或图形状态消息转换器`
);
}
});
this.subscriptions.set(sub.destination, sub);
}
/**
*
*/
resubscribe() {
this.subscriptions.forEach((record) => {
this.subscribe(record);
});
} }
unsbuscribe(destination: string) { unsbuscribe(destination: string) {
const oldSub = this.subscriptions.get(destination); const oldSub = this.subscriptions.get(destination);
if (oldSub) { if (oldSub) {
if (oldSub.subscription && WsMsgCli.isConnected()) { WsMsgCli.unregisterSubscription(destination, oldSub);
oldSub.subscription.unsubscribe();
}
oldSub.subscription = undefined;
} }
} }
unsbuscribeAll() { unsbuscribeAll() {
this.subscriptions.forEach((record) => { this.subscriptions.forEach((record, destination) => {
this.unsbuscribe(record.destination); this.unsbuscribe(destination);
}); });
} }

View File

@ -1,20 +1,20 @@
import { Client as StompClient, type Frame } from '@stomp/stompjs'; import { Client as StompClient, type Frame } from '@stomp/stompjs';
import { MessageCliOption } from './MessageBroker';
import { import {
ISubscription, HandleMessage,
IUnsubscriptor,
MessageClient, MessageClient,
MessageHandler,
} from './BasicMessageClient'; } from './BasicMessageClient';
import { CompleteMessageCliOption } from './MessageBroker';
const ReconnectDelay = 3000; const ReconnectDelay = 3000;
const HeartbeatIncoming = 30000; const HeartbeatIncoming = 30000;
const HeartbeatOutgoing = 30000; const HeartbeatOutgoing = 30000;
export class StompMessagingClient extends MessageClient { export class StompMessagingClient extends MessageClient {
options: MessageCliOption;
cli: StompClient; cli: StompClient;
constructor(options: MessageCliOption) {
super(); constructor(options: CompleteMessageCliOption) {
super(options);
this.options = options; this.options = options;
this.cli = new StompClient({ this.cli = new StompClient({
brokerURL: options.wsUrl, brokerURL: options.wsUrl,
@ -27,6 +27,9 @@ export class StompMessagingClient extends MessageClient {
}); });
this.cli.onConnect = () => { this.cli.onConnect = () => {
// this.subClients.forEach((cli) => {
// this.subscribe(cli.destination, cli.handleMessage);
// });
this.emit('connected', ''); this.emit('connected', '');
}; };
this.cli.onStompError = (frame: Frame) => { this.cli.onStompError = (frame: Frame) => {
@ -55,7 +58,7 @@ export class StompMessagingClient extends MessageClient {
return this.cli.connected; return this.cli.connected;
} }
subscribe(destination: string, handle: MessageHandler): ISubscription { subscribe(destination: string, handle: HandleMessage): IUnsubscriptor {
const sub = this.cli.subscribe( const sub = this.cli.subscribe(
destination, destination,
(frame) => { (frame) => {
@ -73,6 +76,10 @@ export class StompMessagingClient extends MessageClient {
return sub; return sub;
} }
unsubscribe0(destination: string): void {
this.cli.unsubscribe(destination);
}
close(): void { close(): void {
this.cli.deactivate(); this.cli.deactivate();
} }