修改TopicMessage中的data类型为byte[];修改监听器框架会触发事件的父类及接口的监听器;修改西安NCC报警信息通过ws发送时的数据格式

This commit is contained in:
joylink_zhangsai 2023-06-30 15:14:09 +08:00
parent 500f680d73
commit e29c999288
16 changed files with 2119 additions and 93 deletions

View File

@ -1,6 +1,7 @@
package club.joylink.xiannccda.alert;
import club.joylink.xiannccda.alert.core.AlertManager;
import club.joylink.xiannccda.event.Listener;
import club.joylink.xiannccda.ws.WsMessageServerManager;
import club.joylink.xiannccda.ws.XianNccAlertMessageServer;
import org.springframework.boot.ApplicationArguments;
@ -19,9 +20,23 @@ public class AlertJob implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
//添加报警事件监听器
addAlertListeners();
//注册西安NCC的报警ws消息发送服务
wsMessageServerManager.registerMessageServer(XianNccAlertMessageServer.getInstance());
//启动报警源事件监测任务
AlertManager.getInstance().taskStart();
}
public void addAlertListeners() {
XianNccAlertMessageServer xianNccAlertMessageServer = XianNccAlertMessageServer.getInstance();
AlertManager alertManager = AlertManager.getInstance();
//报警监听
alertManager.on(new Listener<XianNccAlertInfo>() {
@Override
public void accept(XianNccAlertInfo event) {
xianNccAlertMessageServer.addMsg(event);
}
});
}
}

View File

@ -1,7 +1,5 @@
package club.joylink.xiannccda.alert.xian3;
package club.joylink.xiannccda.alert;
import club.joylink.xiannccda.alert.AlertType;
import club.joylink.xiannccda.alert.core.AlertInfo;
import java.time.LocalDateTime;
import lombok.Builder;
import lombok.Data;
@ -12,7 +10,7 @@ import lombok.NonNull;
*/
@Builder
@Data
public class Xian3TrainDelayAlert implements AlertInfo {
public class TrainDelayAlert implements XianNccAlertInfo {
@NonNull
private String id;
@ -41,6 +39,11 @@ public class Xian3TrainDelayAlert implements AlertInfo {
return info;
}
@Override
public Integer getAlertTipId() {
return alertTipId;
}
public AlertType getAlertType() {
return AlertType.XIAN3_TRAIN_DELAY;
}

View File

@ -0,0 +1,8 @@
package club.joylink.xiannccda.alert;
import club.joylink.xiannccda.alert.core.AlertInfo;
public interface XianNccAlertInfo extends AlertInfo {
Integer getAlertTipId();
}

View File

@ -1,19 +1,17 @@
package club.joylink.xiannccda.alert.core;
import club.joylink.xiannccda.event.EventEmitter;
import club.joylink.xiannccda.exception.BusinessExceptionAssertEnum;
import com.google.common.eventbus.EventBus;
import java.security.Key;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
/**
* 事件任务管理器
*/
@Slf4j
public class AlertManager extends EventEmitter {
private static final Map<String, AlertManager> MANAGER_MAP = new ConcurrentHashMap<>();
@ -67,7 +65,7 @@ public class AlertManager extends EventEmitter {
try {
alertMonitoringTask.run();
} catch (Throwable t) {
t.printStackTrace();
log.error("【报警监测任务循环逻辑出现异常】", t);
}
}),
0, interval, TimeUnit.MILLISECONDS);

View File

@ -1,15 +1,7 @@
package club.joylink.xiannccda.alert.xian3;
import club.joylink.xiannccda.alert.core.AlertManager;
import club.joylink.xiannccda.alert.AlertTipLocationType;
import club.joylink.xiannccda.alert.AlertTipTimeType;
import club.joylink.xiannccda.entity.AlertTip;
import club.joylink.xiannccda.event.Listener;
import club.joylink.xiannccda.repository.IAlertTipRepository;
import club.joylink.xiannccda.ws.XianNccAlertMessageServer;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import java.time.LocalTime;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
@ -27,33 +19,8 @@ public class Xian3AlertJob implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
XianNccAlertMessageServer xianNccAlertMessageServer = XianNccAlertMessageServer.getInstance();
AlertManager alertManager = AlertManager.getInstance();
//报警源
alertManager.addTask(new Xian3TrainDelayAlertMonitoringTask());
//报警监听
alertManager.on(new Listener<Xian3TrainDelayAlert>() {
@Override
public void accept(Xian3TrainDelayAlert event) {
LambdaQueryWrapper<AlertTip> queryWrapper = Wrappers.lambdaQuery(AlertTip.class);
queryWrapper.eq(AlertTip::getAlertType, event.getAlertType().name())
.eq(AlertTip::getLocationType, AlertTipLocationType.QX.name());
LocalTime start = LocalTime.of(7, 0, 0);
LocalTime end = LocalTime.of(9, 0, 0);
LocalTime now = LocalTime.now();
if (!now.isBefore(start) && !now.isAfter(end)) {
queryWrapper.eq(AlertTip::getTimeType, AlertTipTimeType.CLOCK_7_9.name());
} else {
queryWrapper.isNull(AlertTip::getTimeType);
}
AlertTip alertTip = alertTipRepository.getOne(queryWrapper);
if (alertTip != null) {
event.setAlertTipId(alertTip.getId());
}
xianNccAlertMessageServer.addMsg(event);
}
});
}
}

View File

@ -1,11 +1,12 @@
package club.joylink.xiannccda.alert.xian3;
import club.joylink.xiannccda.alert.TrainDelayAlert;
import club.joylink.xiannccda.alert.core.AlertManager;
import club.joylink.xiannccda.alert.core.AlertMonitoringTask;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicInteger;
class Xian3TrainDelayAlertMonitoringTask implements AlertMonitoringTask {
public class Xian3TrainDelayAlertMonitoringTask implements AlertMonitoringTask {
private final AtomicInteger id = new AtomicInteger(1);
@ -29,12 +30,13 @@ class Xian3TrainDelayAlertMonitoringTask implements AlertMonitoringTask {
} else {
if (now.getSecond() % 10 == 0) {
AlertManager alertManager = AlertManager.getInstance();
Xian3TrainDelayAlert alert = Xian3TrainDelayAlert.builder()
TrainDelayAlert alert = TrainDelayAlert.builder()
.id(String.valueOf(id.getAndIncrement()))
.level("YELLOW")
.alertTime(now)
.info(String.format("[3号线]列车[01-1001]按计划应于%s抵达[%s],现因[%s]晚点%s分钟",
now.minusMinutes(2), "鱼化寨", "道岔P0110失表", "2"))
.alertTipId(1)
.build();
alertManager.emit(alert);
alertTriggered = true;

View File

@ -16,4 +16,8 @@ public enum CoordinateSystem {
* 正线
*/
MAIN_LINE,
/**
* 换乘
*/
TRANSFER,
}

File diff suppressed because it is too large Load Diff

View File

@ -18,7 +18,8 @@ public class EventEmitter {
private Map<Class<?>, List<Listener<?>>> listenerMap = new HashMap<>();
@Getter private String id;
@Getter
private String id;
public EventEmitter(String id) {
this.id = id;
@ -47,9 +48,40 @@ public class EventEmitter {
}
public <E> void emit(E event) {
List<Listener<?>> listeners = listenerMap.get(event.getClass());
// 调用监听该事件的监听器
Class<?> aClass = event.getClass();
List<Listener<?>> listeners = listenerMap.get(aClass);
if (!CollectionUtils.isEmpty(listeners)) {
listeners.forEach(listener -> ((Listener<E>) listener).accept(event));
}
// 调用监听该事件实现的接口的监听器
callInterfaceListener(aClass, event);
// 调用监听该事件继承的父类的监听器
callSuperClassListener(aClass, event);
}
private <E> void callInterfaceListener(Class<?> aClass, E event) {
for (Class<?> anInterface : aClass.getInterfaces()) {
callInterfaceListener(anInterface, event);
List<Listener<?>> superClassListeners = listenerMap.get(anInterface);
if (!CollectionUtils.isEmpty(superClassListeners)) {
for (Listener<?> listener : superClassListeners) {
((Listener) listener).accept(anInterface.cast(event));
}
}
}
}
private <E> void callSuperClassListener(Class<?> aClass, E event) {
Class<?> superclass = aClass.getSuperclass();
while (superclass != null) {
List<Listener<?>> superClassListeners = listenerMap.get(superclass);
if (!CollectionUtils.isEmpty(superClassListeners)) {
for (Listener<?> listener : superClassListeners) {
((Listener) listener).accept(superclass.cast(event));
}
}
superclass = superclass.getSuperclass();
}
}
}

View File

@ -1,6 +1,5 @@
package club.joylink.xiannccda.ws;
import club.joylink.xiannccda.ats.message.collect.DeviceStatusDataRepository;
import club.joylink.xiannccda.dto.protos.DeviceStatusProto.Rtu;
import club.joylink.xiannccda.dto.protos.DeviceStatusProto.Signal;
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineMessage;
@ -47,8 +46,8 @@ public class TestMessageServer implements IMessageServer {
@Override
public List<TopicMessage> onTick() {
List<TopicMessage> topicMessages = new ArrayList<>();
topicMessages.add(new TopicMessage(String.format("%s/%s", PathPrefix, 1), "线路1数据"));
topicMessages.add(new TopicMessage(String.format("%s/%s", PathPrefix, 2), "线路2数据"));
topicMessages.add(new TopicMessage(String.format("%s/%s", PathPrefix, 1), "线路1数据".getBytes()));
topicMessages.add(new TopicMessage(String.format("%s/%s", PathPrefix, 2), "线路2数据".getBytes()));
return topicMessages;
}
}

View File

@ -51,7 +51,7 @@ public class TestMessageServer2 implements IMessageServer {
public List<TopicMessage> onTick() {
List<TopicMessage> topicMessages = new ArrayList<>();
topicMessages.add(
new TopicMessage(this.getDestinationPattern(), String.format("线路%s数据", this.lineId)));
new TopicMessage(this.getDestinationPattern(), String.format("线路%s数据", this.lineId).getBytes()));
return topicMessages;
}
}

View File

@ -6,9 +6,9 @@ import lombok.Getter;
public class TopicMessage {
String destination;
Object msg;
byte[] msg;
public TopicMessage(String destination, Object msg) {
public TopicMessage(String destination, byte[] msg) {
this.destination = destination;
this.msg = msg;
}

View File

@ -1,9 +1,10 @@
package club.joylink.xiannccda.ws;
import club.joylink.xiannccda.alert.core.AlertInfo;
import club.joylink.xiannccda.alert.core.AlertManager;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONWriter.Feature;
import club.joylink.xiannccda.alert.XianNccAlertInfo;
import club.joylink.xiannccda.dto.protos.AlertInfoProto.AlertInfoMessage;
import club.joylink.xiannccda.dto.protos.AlertInfoProto.AlertInfoMessage.Builder;
import club.joylink.xiannccda.dto.protos.AlertInfoProto.AlertInfoMessage.Message;
import com.google.protobuf.Int32Value;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@ -20,8 +21,8 @@ public class XianNccAlertMessageServer implements IMessageServer {
private String id;
private final String Destination = "/queue/xian/ncc/alert";
private final LinkedHashMap<String, AlertInfo> allMsg = new LinkedHashMap<>();
Queue<AlertInfo> pendingMsgQueue = new ConcurrentLinkedQueue<>();
private final LinkedHashMap<String, AlertInfoMessage.Message> allMsg = new LinkedHashMap<>();
Queue<XianNccAlertInfo> pendingMsgQueue = new ConcurrentLinkedQueue<>();
@Override
public String getDestinationPattern() {
@ -30,7 +31,11 @@ public class XianNccAlertMessageServer implements IMessageServer {
@Override
public Object onSubscription(String destination, Map<String, String> paramMap) {
return allMsg.values();
byte[] bytes = AlertInfoMessage.newBuilder()
.addAllMessages(allMsg.values())
.build()
.toByteArray();
return bytes.length == 0 ? null : bytes;
}
@Override
@ -40,17 +45,22 @@ public class XianNccAlertMessageServer implements IMessageServer {
@Override
public List<TopicMessage> onTick() {
List<TopicMessage> topicMessageList = new ArrayList<>();
AlertInfo alertInfo;
Builder builder = AlertInfoMessage.newBuilder();
XianNccAlertInfo alertInfo;
for (int i = 0; i < 10; i++) { //加循环次数主要是为了防止while死循环
if ((alertInfo = pendingMsgQueue.poll()) != null) {
allMsg.put(alertInfo.getId(), alertInfo);
topicMessageList.add(new TopicMessage(Destination, alertInfo));
Message message = convertToMessage(alertInfo);
allMsg.put(message.getId(), message);
builder.addMessages(message);
} else {
break;
}
}
return topicMessageList;
byte[] bytes = builder.build().toByteArray();
if (bytes != null && bytes.length != 0) {
return List.of(new TopicMessage(getDestinationPattern(), bytes));
}
return new ArrayList<>();
}
private XianNccAlertMessageServer(String id) {
@ -69,7 +79,20 @@ public class XianNccAlertMessageServer implements IMessageServer {
return SERVER_MAP.remove(id);
}
public void addMsg(AlertInfo alertInfo) {
public void addMsg(XianNccAlertInfo alertInfo) {
pendingMsgQueue.add(alertInfo);
}
private AlertInfoMessage.Message convertToMessage(XianNccAlertInfo alertInfo) {
Message.Builder builder = Message.newBuilder()
.setId(alertInfo.getId())
.setLevel(alertInfo.getLevel())
.setAlertTime(alertInfo.getAlertTime().toString())
.setInfo(alertInfo.getInfo());
Integer alertTipId = alertInfo.getAlertTipId();
if (alertTipId != null) {
builder.setAlertTipId(Int32Value.newBuilder().setValue(alertTipId));
}
return builder.build();
}
}

View File

@ -1,45 +1,40 @@
package club.joylink.xiannccda;
import com.alibaba.fastjson2.support.spring6.http.converter.FastJsonHttpMessageConverter;
import com.alibaba.fastjson2.support.spring6.messaging.converter.MappingFastJsonMessageConverter;
import com.alibaba.fastjson2.support.spring6.websocket.sockjs.FastjsonSockJsMessageCodec;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import lombok.NonNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import club.joylink.xiannccda.dto.protos.AlertInfoProto.AlertInfoMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import java.lang.reflect.Type;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.ByteArrayMessageConverter;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import java.lang.reflect.Type;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class WebSocketTest {
@Test
public void testWebSocketSubscription() throws Exception {
WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());
stompClient.setMessageConverter(new MappingFastJsonMessageConverter());
stompClient.setMessageConverter(new ByteArrayMessageConverter());
String webSocketUrl = "ws://localhost:9081/ws-default";
CountDownLatch latch = new CountDownLatch(1);
WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
headers.set("Authorization", "Bearer YourAccessTokenHere");
headers.set("Authorization",
"Bearer eyJhbGciOiJSUzI1NiJ9.eyJpc3MiOiJzZWxmIiwic3ViIjoiMTQiLCJleHAiOjE2ODgzNDY3ODgsImlhdCI6MTY4ODA4NzU4OH0.CCAHMqzzVXDYlvIBYhOBT92_MaYTSUDMmPwkzFDMRXR_LSidyVkm3o5SmqaiORjTgx_B0kxgwgpDGdXs3A_tmDrHolIUCvSaiez-hK3v30Z3Z4DhxtE8073tAqGdM-uU1eXTGC7zm20tM5riDe_Qy5PF5lf9qe9ty0Y06s62FNRNcyCBw-DuhkQcSkWfcMiUHKYVge2weO5Mzm1nD-1yViI359-smYU5eYJInVBdOaBvOECQ4OF8GJQ9rfgktPJAUsrXT0bf8cHp-fUUSvuNv5QUVmg2j-tZDIjhlN3OkXpHDiXuMPnx_lHVS95CO0metFozU2OT7uYukV5Z6RN4JA");
StompHeaders stompHeaders = new StompHeaders();
stompHeaders.set("Authorization",
"Bearer eyJhbGciOiJSUzI1NiJ9.eyJpc3MiOiJzZWxmIiwic3ViIjoiMTQiLCJleHAiOjE2ODgzNDY3ODgsImlhdCI6MTY4ODA4NzU4OH0.CCAHMqzzVXDYlvIBYhOBT92_MaYTSUDMmPwkzFDMRXR_LSidyVkm3o5SmqaiORjTgx_B0kxgwgpDGdXs3A_tmDrHolIUCvSaiez-hK3v30Z3Z4DhxtE8073tAqGdM-uU1eXTGC7zm20tM5riDe_Qy5PF5lf9qe9ty0Y06s62FNRNcyCBw-DuhkQcSkWfcMiUHKYVge2weO5Mzm1nD-1yViI359-smYU5eYJInVBdOaBvOECQ4OF8GJQ9rfgktPJAUsrXT0bf8cHp-fUUSvuNv5QUVmg2j-tZDIjhlN3OkXpHDiXuMPnx_lHVS95CO0metFozU2OT7uYukV5Z6RN4JA");
StompSessionHandlerAdapter sessionHandler = new StompSessionHandlerAdapter() {
@Override
@ -59,14 +54,19 @@ public class WebSocketTest {
@Override
public void handleFrame(StompHeaders headers, Object payload) {
String message = new String((byte[]) payload, StandardCharsets.UTF_8);
AlertInfoMessage message = null;
try {
message = AlertInfoMessage.parseFrom((byte[]) payload);
System.out.println("Received message: " + message);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
});
}
};
stompClient.connectAsync(webSocketUrl, sessionHandler);
stompClient.connectAsync(webSocketUrl, headers, stompHeaders, sessionHandler);
assertTrue(latch.await(20, TimeUnit.SECONDS));
}

View File

@ -1,10 +1,9 @@
package club.joylink.xiannccda.alert;
import club.joylink.xiannccda.alert.core.AlertManager;
import club.joylink.xiannccda.alert.core.AlertSourceEvent;
import club.joylink.xiannccda.alert.core.AlertMonitoringTask;
import club.joylink.xiannccda.alert.core.AlertSourceEvent;
import club.joylink.xiannccda.alert.xian3.Xian3SupplyShortageAlert;
import club.joylink.xiannccda.alert.xian3.Xian3TrainDelayAlert;
import club.joylink.xiannccda.event.Listener;
import java.util.ArrayList;
import java.util.List;
@ -23,7 +22,7 @@ public class AlertManagerTest {
//可能报警事件监听器
manager.on(new SuppliesRemainInsufficientListener(sid));
//报警事件监听器
manager.on((Listener<Xian3TrainDelayAlert>) event -> System.out.println("列车延误报警"));
manager.on((Listener<TrainDelayAlert>) event -> System.out.println("列车延误报警"));
manager.on((Listener<Xian3SupplyShortageAlert>) event -> System.out.println("物资紧缺报警"));
manager.taskStart();
@ -48,7 +47,7 @@ public class AlertManagerTest {
@Override
public void run() {
AlertManager alertManager = AlertManager.getInstance(sId);
alertManager.emit(Xian3TrainDelayAlert.builder().id("1").build());
alertManager.emit(TrainDelayAlert.builder().id("1").build());
}
}

@ -1 +1 @@
Subproject commit 9a7b4b7a3a912970197155a149a9da463d2a68fc
Subproject commit 2b9ad05b202b3b90c3f64b6b8c8a56ec83716f08