代码同步

This commit is contained in:
fan 2023-07-31 16:58:06 +08:00
parent 4adaffd770
commit 35ddeb91c4
4 changed files with 58 additions and 34 deletions

@ -1 +1 @@
Subproject commit e521b07d86e7ac10bc6cb96288ae025b677485f7
Subproject commit d7379d4406889053bdf764eff8cd83779bb9d751

View File

@ -1,5 +1,5 @@
import { Centrifuge, State } from 'centrifuge';
import CentrifugeProtobuf from 'centrifuge/build/protobuf';
import { State } from 'centrifuge';
import Centrifuge from 'centrifuge/build/protobuf';
import { MessageCliOption } from './MessageBroker';
import {
ISubscription,
@ -19,7 +19,7 @@ export class CentrifugeMessagingClient extends MessageClient {
protocol: options.protocol,
});
} else {
this.cli = new CentrifugeProtobuf(options.wsUrl, {
this.cli = new Centrifuge(options.wsUrl, {
token: options.token,
protocol: options.protocol,
});
@ -55,7 +55,11 @@ export class CentrifugeMessagingClient extends MessageClient {
if (this.options.protocol === 'json') {
console.log('收到centrifuge消息:', ctx.data);
}
handle(ctx.data);
try {
handle(ctx.data);
} catch (error) {
console.log('websocket状态消息处理异常', error);
}
})
.on('subscribed', (ctx) => {
console.log('订阅centrifuge服务消息成功', destination, ctx);

View File

@ -31,24 +31,27 @@ export interface MessageCliOption {
* token
*/
token?: string;
/**
*
* @returns
*/
onAuthenticationFailed?: () => void;
/**
*
* @param ctx
* @returns
*/
onConnected?: (ctx: unknown) => void;
/**
*
*/
onDisconnected?: (ctx: unknown) => void;
reconnectDelay?: number; // 重连延时默认3秒,设置为0不重连.
heartbeatIncoming?: number; // 服务端过来的心跳间隔默认30秒
heartbeatOutgoing?: number; // 到服务端的心跳间隔默认30秒
// /**
// * 认证失败处理
// * @returns
// */
// onAuthenticationFailed?: () => void;
// /**
// * 连接成功处理
// * @param ctx
// * @returns
// */
// onConnected?: (ctx: unknown) => void;
// /**
// * 端口连接处理
// */
// onDisconnected?: (ctx: unknown) => void;
// // 重连延时默认3秒,设置为0不重连.
// reconnectDelay?: number;
// // 服务端过来的心跳间隔默认30秒
// heartbeatIncoming?: number;
// // 到服务端的心跳间隔默认30秒
// heartbeatOutgoing?: number;
}
const DefaultStompOption: MessageCliOption = {
@ -56,9 +59,9 @@ const DefaultStompOption: MessageCliOption = {
protocol: 'protobuf',
wsUrl: '',
token: '',
reconnectDelay: 3000,
heartbeatIncoming: 30000,
heartbeatOutgoing: 30000,
// reconnectDelay: 3000,
// heartbeatIncoming: 30000,
// heartbeatOutgoing: 30000,
};
export interface IMessageClient extends EventEmitter<MessageClientEvents> {
@ -69,8 +72,14 @@ export interface IMessageClient extends EventEmitter<MessageClientEvents> {
*/
subscribe(destination: string, handle: MessageHandler): ISubscription;
/**
*
*/
get connected(): boolean;
/**
*
*/
close(): void;
}
@ -104,6 +113,9 @@ export class WsMsgCli {
const cli = WsMsgCli.client;
cli.on('connected', () => {
WsMsgCli.emitConnectStateChangeEvent(true);
WsMsgCli.appMsgBroker.forEach((broker) => {
broker.resubscribe();
});
});
cli.on('disconnected', () => {
WsMsgCli.emitConnectStateChangeEvent(false);
@ -123,11 +135,11 @@ export class WsMsgCli {
static trySubscribe(
destination: string,
handler: MessageHandler
): ISubscription {
return WsMsgCli.client.subscribe(destination, handler);
// if (WsMsgCli.isConnected()) {
// }
// return undefined;
): ISubscription | undefined {
if (WsMsgCli.isConnected()) {
return WsMsgCli.client.subscribe(destination, handler);
}
return undefined;
}
static registerAppMsgBroker(broker: AppWsMsgBroker) {
@ -216,6 +228,9 @@ export class AppWsMsgBroker {
this.subscriptions.set(sub.destination, sub);
}
/**
*
*/
resubscribe() {
this.subscriptions.forEach((record) => {
this.subscribe(record);

View File

@ -6,6 +6,10 @@ import {
MessageHandler,
} from './BasicMessageClient';
const ReconnectDelay = 3000;
const HeartbeatIncoming = 30000;
const HeartbeatOutgoing = 30000;
export class StompMessagingClient extends MessageClient {
options: MessageCliOption;
cli: StompClient;
@ -17,9 +21,9 @@ export class StompMessagingClient extends MessageClient {
connectHeaders: {
Authorization: options.token ? options.token : '',
},
reconnectDelay: options.reconnectDelay,
heartbeatIncoming: options.heartbeatIncoming,
heartbeatOutgoing: options.heartbeatOutgoing,
reconnectDelay: ReconnectDelay,
heartbeatIncoming: HeartbeatIncoming,
heartbeatOutgoing: HeartbeatOutgoing,
});
this.cli.onConnect = () => {
@ -29,6 +33,7 @@ export class StompMessagingClient extends MessageClient {
const errMsg = frame.headers['message'];
if (errMsg === '401') {
console.warn('认证失败,断开WebSocket连接');
this.cli.deactivate();
} else {
console.error('收到Stomp错误消息', frame);
}