From 2255232499c136c80312564cf778e8970d421054 Mon Sep 17 00:00:00 2001 From: joylink_fanyuhong <18706759286@163.com> Date: Sat, 28 Sep 2024 10:04:20 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E6=B7=BB=E5=8A=A0=E5=8F=91=E5=B8=83?= =?UTF-8?q?=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/message/MqttBroker.d.ts | 1 + src/app/JlGraphicApp.ts | 11 +++++++++++ src/message/BasicMessageClient.ts | 11 +++++++++++ src/message/MessageBroker.ts | 15 ++++++++++++++- src/message/MqttBroker.ts | 9 +++++++++ src/message/WsMsgBroker.ts | 4 ++++ 6 files changed, 50 insertions(+), 1 deletion(-) diff --git a/lib/message/MqttBroker.d.ts b/lib/message/MqttBroker.d.ts index 1a3f7b5..fb079a4 100644 --- a/lib/message/MqttBroker.d.ts +++ b/lib/message/MqttBroker.d.ts @@ -8,6 +8,7 @@ export declare class MqttMsgClient extends MessageClient { constructor(options: CompleteMessageCliOption); subscribe(destination: string, handle: HandleMessage): boolean; unsubscribe0(destination: string): void; + publishMessage(destination: string, message: string): void; get connected(): boolean; close(): void; } diff --git a/src/app/JlGraphicApp.ts b/src/app/JlGraphicApp.ts index 27aeac4..ca017de 100644 --- a/src/app/JlGraphicApp.ts +++ b/src/app/JlGraphicApp.ts @@ -612,6 +612,10 @@ export interface IGraphicScene extends EventEmitter { * 取消websocket订阅 */ unsubscribe(destination: string): void; + /** + * 发布websocket消息 + */ + publishMessage(destination: string, message: string): void; } abstract class GraphicSceneBase @@ -1158,6 +1162,13 @@ abstract class GraphicSceneBase this.checkWsMsgCli(); this.wsMsgBroker.unsbuscribe(destination); } + /** + * 发布websocket消息 + */ + publishMessage(destination: string, message: string) { + this.checkWsMsgCli(); + this.wsMsgBroker.publishMessage(destination, message); + } /** * 处理图形状态 * @param graphicStates diff --git a/src/message/BasicMessageClient.ts b/src/message/BasicMessageClient.ts index e5ec525..5618d66 100644 --- a/src/message/BasicMessageClient.ts +++ b/src/message/BasicMessageClient.ts @@ -70,6 +70,11 @@ export abstract class MessageClient this.getOrNewSubClient(destination).removeHandler(handle); } + publishMessage(destination: string, message: string): void { + const cli = this.getOrNewSubClient(destination); + cli.publishMessage(destination, message); + } + abstract get connected(): boolean; abstract close(): void; @@ -125,6 +130,12 @@ export class SubscriptionClient { this.mc.unsubscribe(this.destination); } + publishMessage(destination: string, message: string): void { + if (this.mc.connected) { + this.mc.publishMessage(destination, message); + } + } + handleMessage(data: any) { if (this.protocol === 'json') { console.debug('收到消息:', data); diff --git a/src/message/MessageBroker.ts b/src/message/MessageBroker.ts index 85d4408..a060ca7 100644 --- a/src/message/MessageBroker.ts +++ b/src/message/MessageBroker.ts @@ -83,7 +83,12 @@ export interface IMessageClient extends EventEmitter { * @param handler */ removeSubscription(destination: string, handler: IMessageHandler): void; - + /** + * 发布消息 + * @param destination + * @param message + */ + publishMessage(destination: string, message: string): void; /** * 是否已经连接 */ @@ -163,6 +168,10 @@ export class WsMsgCli { WsMsgCli.appMsgBroker.push(broker); } + static publishMessage(destination: string, message: string) { + WsMsgCli.client.publishMessage(destination, message); + } + static removeAppMsgBroker(broker: AppWsMsgBroker) { const index = WsMsgCli.appMsgBroker.findIndex((mb) => mb == broker); if (index >= 0) { @@ -295,6 +304,10 @@ export class AppWsMsgBroker { }); } + publishMessage(destination: string, message: string) { + WsMsgCli.publishMessage(destination, message); + } + /** * 取消所有订阅,从通用Stomp客户端移除此消息代理 */ diff --git a/src/message/MqttBroker.ts b/src/message/MqttBroker.ts index c687141..b876e54 100644 --- a/src/message/MqttBroker.ts +++ b/src/message/MqttBroker.ts @@ -112,4 +112,13 @@ export class MqttMsgClient extends MessageClient { console.warn('MQTT 消息客户端关闭失败', error); } } + publishMessage(destination: string, message: string): void { + console.debug('MQTT发布消息'); + if(this.connected) { + this.cli.publish(destination, message); + }else { + console.warn('MQTT 未连接,消息发布失败'); + } + } } + diff --git a/src/message/WsMsgBroker.ts b/src/message/WsMsgBroker.ts index 9e54f8d..1082f5a 100644 --- a/src/message/WsMsgBroker.ts +++ b/src/message/WsMsgBroker.ts @@ -77,6 +77,10 @@ export class StompMessagingClient extends MessageClient { this.cli.unsubscribe(destination); } + publishMessage(destination: string, message: string): void { + console.debug('MQTT发布消息:未实现'); + } + close(): void { this.cli.deactivate(); }