mqtt添加发布消息

This commit is contained in:
joylink_fanyuhong 2024-09-28 10:04:20 +08:00
parent 8b0ad14f73
commit 2255232499
6 changed files with 50 additions and 1 deletions

View File

@ -8,6 +8,7 @@ export declare class MqttMsgClient extends MessageClient {
constructor(options: CompleteMessageCliOption); constructor(options: CompleteMessageCliOption);
subscribe(destination: string, handle: HandleMessage): boolean; subscribe(destination: string, handle: HandleMessage): boolean;
unsubscribe0(destination: string): void; unsubscribe0(destination: string): void;
publishMessage(destination: string, message: string): void;
get connected(): boolean; get connected(): boolean;
close(): void; close(): void;
} }

View File

@ -612,6 +612,10 @@ export interface IGraphicScene extends EventEmitter<GraphicAppEvents> {
* websocket订阅 * websocket订阅
*/ */
unsubscribe(destination: string): void; unsubscribe(destination: string): void;
/**
* websocket消息
*/
publishMessage(destination: string, message: string): void;
} }
abstract class GraphicSceneBase abstract class GraphicSceneBase
@ -1158,6 +1162,13 @@ abstract class GraphicSceneBase
this.checkWsMsgCli(); this.checkWsMsgCli();
this.wsMsgBroker.unsbuscribe(destination); this.wsMsgBroker.unsbuscribe(destination);
} }
/**
* websocket消息
*/
publishMessage(destination: string, message: string) {
this.checkWsMsgCli();
this.wsMsgBroker.publishMessage(destination, message);
}
/** /**
* *
* @param graphicStates * @param graphicStates

View File

@ -70,6 +70,11 @@ export abstract class MessageClient
this.getOrNewSubClient(destination).removeHandler(handle); this.getOrNewSubClient(destination).removeHandler(handle);
} }
publishMessage(destination: string, message: string): void {
const cli = this.getOrNewSubClient(destination);
cli.publishMessage(destination, message);
}
abstract get connected(): boolean; abstract get connected(): boolean;
abstract close(): void; abstract close(): void;
@ -125,6 +130,12 @@ export class SubscriptionClient {
this.mc.unsubscribe(this.destination); this.mc.unsubscribe(this.destination);
} }
publishMessage(destination: string, message: string): void {
if (this.mc.connected) {
this.mc.publishMessage(destination, message);
}
}
handleMessage(data: any) { handleMessage(data: any) {
if (this.protocol === 'json') { if (this.protocol === 'json') {
console.debug('收到消息:', data); console.debug('收到消息:', data);

View File

@ -83,7 +83,12 @@ export interface IMessageClient extends EventEmitter<MessageClientEvents> {
* @param handler * @param handler
*/ */
removeSubscription(destination: string, handler: IMessageHandler): void; removeSubscription(destination: string, handler: IMessageHandler): void;
/**
*
* @param destination
* @param message
*/
publishMessage(destination: string, message: string): void;
/** /**
* *
*/ */
@ -163,6 +168,10 @@ export class WsMsgCli {
WsMsgCli.appMsgBroker.push(broker); WsMsgCli.appMsgBroker.push(broker);
} }
static publishMessage(destination: string, message: string) {
WsMsgCli.client.publishMessage(destination, message);
}
static removeAppMsgBroker(broker: AppWsMsgBroker) { static removeAppMsgBroker(broker: AppWsMsgBroker) {
const index = WsMsgCli.appMsgBroker.findIndex((mb) => mb == broker); const index = WsMsgCli.appMsgBroker.findIndex((mb) => mb == broker);
if (index >= 0) { if (index >= 0) {
@ -295,6 +304,10 @@ export class AppWsMsgBroker {
}); });
} }
publishMessage(destination: string, message: string) {
WsMsgCli.publishMessage(destination, message);
}
/** /**
* Stomp客户端移除此消息代理 * Stomp客户端移除此消息代理
*/ */

View File

@ -112,4 +112,13 @@ export class MqttMsgClient extends MessageClient {
console.warn('MQTT 消息客户端关闭失败', error); console.warn('MQTT 消息客户端关闭失败', error);
} }
} }
publishMessage(destination: string, message: string): void {
console.debug('MQTT发布消息');
if(this.connected) {
this.cli.publish(destination, message);
}else {
console.warn('MQTT 未连接,消息发布失败');
} }
}
}

View File

@ -77,6 +77,10 @@ export class StompMessagingClient extends MessageClient {
this.cli.unsubscribe(destination); this.cli.unsubscribe(destination);
} }
publishMessage(destination: string, message: string): void {
console.debug('MQTT发布消息:未实现');
}
close(): void { close(): void {
this.cli.deactivate(); this.cli.deactivate();
} }