Merge branch 'master' of https://git.code.tencent.com/xian-ncc-da/xian-ncc-da-server
This commit is contained in:
commit
484076b249
@ -1,29 +0,0 @@
|
||||
package club.joylink.xiannccda.alert.core;
|
||||
|
||||
import club.joylink.xiannccda.util.JsonUtils;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 报警处理器
|
||||
*/
|
||||
public interface AlertHandler<A extends AlertInfo, P> {
|
||||
/**
|
||||
* 获取该处理器可以处理的报警类型
|
||||
*/
|
||||
Class<A> getHandleableClass();
|
||||
|
||||
Class<P> getUpdateParamClass();
|
||||
|
||||
default boolean update(AlertInfo alert, Map<String, Object> param) {
|
||||
if (alert.getClass() == getHandleableClass()) {
|
||||
P p = JsonUtils.read(JsonUtils.writeValueAsString(param), getUpdateParamClass());
|
||||
update(getHandleableClass().cast(alert), p);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void update(A alert, P param);
|
||||
}
|
@ -30,8 +30,4 @@ public interface AlertInfo {
|
||||
*/
|
||||
String getInfo();
|
||||
|
||||
/**
|
||||
* 报警当前阶段
|
||||
*/
|
||||
AlertStage getStage();
|
||||
}
|
||||
|
@ -0,0 +1,60 @@
|
||||
package club.joylink.xiannccda.alert.core;
|
||||
|
||||
import club.joylink.xiannccda.core.EventEmitter;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 事件任务管理器
|
||||
*/
|
||||
public class AlertManager extends EventEmitter {
|
||||
|
||||
private static final Map<String, AlertManager> MANAGER_MAP = new HashMap<>();
|
||||
|
||||
/**
|
||||
* 报警监控任务(循环监测式报警)
|
||||
*/
|
||||
Map<String, AlertMonitoringTask> amtMap = new ConcurrentHashMap<>();
|
||||
/**
|
||||
* 任务执行线程池
|
||||
*/
|
||||
ScheduledExecutorService Executor = Executors.newScheduledThreadPool(1);
|
||||
boolean started = false;
|
||||
final int interval = 100; // 执行器默认间隔,单位ms
|
||||
|
||||
private AlertManager(String id) {
|
||||
super(id);
|
||||
}
|
||||
|
||||
public synchronized static AlertManager getInstance(String id) {
|
||||
return MANAGER_MAP.computeIfAbsent(id, k -> new AlertManager(id));
|
||||
}
|
||||
|
||||
public synchronized static AlertManager removeInstance(String id) {
|
||||
return MANAGER_MAP.remove(id);
|
||||
}
|
||||
|
||||
public void addTask(AlertMonitoringTask... tasks) {
|
||||
if (tasks != null) {
|
||||
for (AlertMonitoringTask task : tasks) {
|
||||
amtMap.put(task.getName(), task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void removeTask(String name) {
|
||||
amtMap.remove(name);
|
||||
}
|
||||
|
||||
public synchronized void taskStart() {
|
||||
if (!started) {
|
||||
started = true;
|
||||
Executor.scheduleAtFixedRate(() -> amtMap.values().forEach(AlertMonitoringTask::run),
|
||||
0, interval, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
}
|
@ -6,37 +6,10 @@ import java.util.function.Supplier;
|
||||
/**
|
||||
* 报警监测任务
|
||||
*/
|
||||
public interface AlertMonitoringTask<D> {
|
||||
public interface AlertMonitoringTask {
|
||||
|
||||
String getName();
|
||||
|
||||
/**
|
||||
* 设置任务所需信息的提供者
|
||||
*/
|
||||
void setInfoSupplier(Supplier<D> supplier);
|
||||
|
||||
/**
|
||||
* 获取任务所需信息的提供者
|
||||
*/
|
||||
Supplier<D> getInfoSupplier();
|
||||
|
||||
/**
|
||||
* 获取任务所需信息
|
||||
*/
|
||||
default D getInfo(){
|
||||
return getInfoSupplier().get();
|
||||
};
|
||||
|
||||
/**
|
||||
* 设置满足报警条件时的回调函数
|
||||
*/
|
||||
void setCallBack(Consumer<AlertInfo> callBack);
|
||||
|
||||
/**
|
||||
* 获取满足报警条件时的回调函数
|
||||
*/
|
||||
Consumer<AlertInfo> getCallBack();
|
||||
|
||||
/**
|
||||
* 任务执行逻辑
|
||||
*/
|
||||
|
@ -1,23 +0,0 @@
|
||||
package club.joylink.xiannccda.alert.core;
|
||||
|
||||
import club.joylink.xiannccda.util.JsonUtils;
|
||||
import java.util.Map;
|
||||
|
||||
public interface AlertProcessHandler <A extends AlertInfo, AP extends AlertStage, P> {
|
||||
Class<A> getAlertClass();
|
||||
AP getAlertStage();
|
||||
Class<P> getParamClass();
|
||||
|
||||
default void accept(AlertInfo alertInfo, Map<String, Object> param) {
|
||||
if (alertInfo.getClass() == getAlertClass()) {
|
||||
if (param == null) {
|
||||
accept(getAlertClass().cast(alertInfo), getAlertStage(), null);
|
||||
} else if (getParamClass() != null){
|
||||
P p = JsonUtils.read(JsonUtils.writeValueAsString(param), getParamClass());
|
||||
accept(getAlertClass().cast(alertInfo), getAlertStage(), p);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void accept(A alert, AP alertStage, P param);
|
||||
}
|
@ -1,33 +0,0 @@
|
||||
package club.joylink.xiannccda.alert.core;
|
||||
|
||||
import com.google.common.collect.HashBasedTable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
@Service
|
||||
public class AlertProcessService {
|
||||
|
||||
private final HashBasedTable<Class<? extends AlertInfo>, AlertStage, AlertProcessHandler<?, ?, ?>> table = HashBasedTable.create();
|
||||
|
||||
@Autowired
|
||||
public AlertProcessService(List<AlertProcessHandler<?, ?, ?>> handlerList) {
|
||||
if (!CollectionUtils.isEmpty(handlerList)) {
|
||||
handlerList.forEach(handler -> {
|
||||
Class<? extends AlertInfo> alertClass = handler.getAlertClass();
|
||||
AlertStage alertStage = handler.getAlertStage();
|
||||
table.put(alertClass, alertStage, handler);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void update(AlertInfo alertInfo, AlertStage alertStage, Map<String, Object> param) {
|
||||
AlertProcessHandler<?, ?, ?> handler = table.get(alertInfo.getClass(), alertStage);
|
||||
if (handler != null) {
|
||||
handler.accept(alertInfo, param);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,38 +0,0 @@
|
||||
package club.joylink.xiannccda.alert.core;
|
||||
|
||||
import club.joylink.xiannccda.util.JsonUtils;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
@Service
|
||||
public class AlertService {
|
||||
|
||||
private final Map<Class<?>, AlertHandler<?, ?>> handlerMap;
|
||||
|
||||
public AlertService(List<AlertHandler<?, ?>> handlerList) {
|
||||
if (!CollectionUtils.isEmpty(handlerList)) {
|
||||
this.handlerMap = handlerList.stream()
|
||||
.collect(Collectors.toMap(AlertHandler::getHandleableClass, Function.identity()));
|
||||
} else {
|
||||
handlerMap = new HashMap<>();
|
||||
}
|
||||
}
|
||||
|
||||
public void update(String sid, String alertId, Map<String, Object> param) {
|
||||
EventTaskManager alertManager = EventTaskManager.getInstance(sid);
|
||||
AlertInfo alert = alertManager.getAlert(alertId);
|
||||
update(alert, param);
|
||||
}
|
||||
|
||||
private void update(AlertInfo alert, Map<String, Object> params) {
|
||||
AlertHandler<?, ?> handler = handlerMap.get(alert.getClass());
|
||||
if (handler != null) {
|
||||
handler.update(alert, params);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,9 +1,11 @@
|
||||
package club.joylink.xiannccda.alert.core;
|
||||
|
||||
import club.joylink.xiannccda.core.Listener;
|
||||
|
||||
/**
|
||||
* 报警源头事件监听器
|
||||
*/
|
||||
public interface AlertSourceEventListener<AE extends AlertSourceEvent> {
|
||||
public interface AlertSourceEventListener<AE extends AlertSourceEvent> extends Listener<AE> {
|
||||
|
||||
void accept(AE event);
|
||||
}
|
||||
|
@ -1,155 +0,0 @@
|
||||
package club.joylink.xiannccda.alert.core;
|
||||
|
||||
import club.joylink.xiannccda.alert.xian3.Xian3Alert;
|
||||
import club.joylink.xiannccda.alert.xian3.Xian3Alert.Stage;
|
||||
import com.google.common.collect.HashBasedTable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 事件任务管理器
|
||||
*/
|
||||
public class EventTaskManager {
|
||||
|
||||
private static final Map<String, EventTaskManager> MANAGER_MAP = new HashMap<>();
|
||||
|
||||
private String id;
|
||||
|
||||
/**
|
||||
* 报警主题监听Map(事件触发式报警)
|
||||
*/
|
||||
Map<String, List<AlertSourceEventListener>> AlertConsumerMap = new ConcurrentHashMap<>();
|
||||
/**
|
||||
* 报警监控任务(循环监测式报警)
|
||||
*/
|
||||
List<AlertMonitoringTask> Amts = new ArrayList<>();
|
||||
Map<String, AlertMonitoringTask> amtMap = new ConcurrentHashMap<>();
|
||||
/**
|
||||
* 任务执行线程池
|
||||
*/
|
||||
ScheduledExecutorService Executor = Executors.newScheduledThreadPool(1);
|
||||
boolean started = false;
|
||||
final int interval = 100; // 执行器默认间隔,单位ms
|
||||
|
||||
//------------------------- 报警后 -----------------------------------
|
||||
private Map<String, AlertInfo> alertMap = new HashMap<>();
|
||||
|
||||
private final HashBasedTable<Class<? extends AlertInfo>, AlertStage, AlertProcessHandler<?, ?, ?>> handlerTable = HashBasedTable.create();
|
||||
|
||||
private EventTaskManager(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public static EventTaskManager newInstance(String id) {
|
||||
return MANAGER_MAP.compute(id, (k, v) -> new EventTaskManager(id));
|
||||
}
|
||||
|
||||
public static EventTaskManager getInstance(String id) {
|
||||
return MANAGER_MAP.computeIfAbsent(id, k -> new EventTaskManager(id));
|
||||
}
|
||||
|
||||
public void registerAlertMonitoringTask(AlertMonitoringTask task) {
|
||||
Amts.add(task);
|
||||
taskStart();
|
||||
}
|
||||
|
||||
public void unregisterAlertMonitoringTask(AlertMonitoringTask task) {
|
||||
Amts.remove(task);
|
||||
}
|
||||
|
||||
synchronized void taskStart() {
|
||||
if (!started) {
|
||||
started = true;
|
||||
Executor.scheduleAtFixedRate(() -> Amts.forEach(AlertMonitoringTask::run),
|
||||
0, interval, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取指定报警主题监听器
|
||||
*
|
||||
* @param topic
|
||||
* @return
|
||||
*/
|
||||
public List<AlertSourceEventListener> getAlertConsumers(String topic) {
|
||||
List<AlertSourceEventListener> list = AlertConsumerMap.computeIfAbsent(
|
||||
topic, k -> new ArrayList<>());
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加指定报警主题监听器
|
||||
*
|
||||
* @param topic
|
||||
* @param consumer
|
||||
*/
|
||||
public void on(String topic, AlertSourceEventListener consumer) {
|
||||
List<AlertSourceEventListener> list = getAlertConsumers(topic);
|
||||
list.add(consumer);
|
||||
}
|
||||
|
||||
/**
|
||||
* 移除指定报警主题监听器
|
||||
*
|
||||
* @param topic
|
||||
* @param listener
|
||||
*/
|
||||
public void off(String topic, AlertSourceEventListener listener) {
|
||||
List<AlertSourceEventListener> list = getAlertConsumers(topic);
|
||||
list.remove(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* 调用指定报警主题监听器
|
||||
*
|
||||
* @param topic
|
||||
* @param event
|
||||
*/
|
||||
public void emit(String topic, AlertSourceEvent event) {
|
||||
List<AlertSourceEventListener> list = getAlertConsumers(topic);
|
||||
list.forEach(consumer -> consumer.accept(event));
|
||||
}
|
||||
|
||||
public void registerAlertHandler(AlertProcessHandler<?, ?, ?> alertProcessHandler) {
|
||||
AlertStage alertStage = alertProcessHandler.getAlertStage();
|
||||
handlerTable.put(alertProcessHandler.getAlertClass(), alertStage, alertProcessHandler);
|
||||
}
|
||||
|
||||
public void unregisterAlertHandler(Class<?> alertClass, AlertStage alertStage) {
|
||||
handlerTable.remove(alertClass, alertStage);
|
||||
}
|
||||
|
||||
public void alertOccurred(AlertInfo alertInfo) {
|
||||
alertFlow(alertInfo, Stage.OCCURRED, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* 报警信息流转
|
||||
*/
|
||||
public void alertFlow(AlertInfo alert, AlertStage alertStage, Map<String, Object> param) {
|
||||
AlertProcessHandler<?, ?, ?> handler = handlerTable.get(alert.getClass(),
|
||||
alertStage);
|
||||
if (handler != null) {
|
||||
handler.accept(alert, param);
|
||||
}
|
||||
}
|
||||
|
||||
public AlertInfo getAlert(String alertId) {
|
||||
return alertMap.get(alertId);
|
||||
}
|
||||
|
||||
public <E extends AlertInfo> E getAlert(String alertId, Class<E> cls) {
|
||||
AlertInfo alert = alertMap.get(alertId);
|
||||
if (cls.isInstance(alert)) {
|
||||
return cls.cast(alert);
|
||||
}
|
||||
throw new RuntimeException();
|
||||
}
|
||||
|
||||
}
|
@ -1,27 +0,0 @@
|
||||
package club.joylink.xiannccda.alert.xian3;
|
||||
|
||||
import club.joylink.xiannccda.alert.core.AlertHandler;
|
||||
import java.util.Map;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 西安3报警处理器
|
||||
*/
|
||||
@Component
|
||||
public class Xian3AlertHandler implements AlertHandler<Xian3Alert, Xian3AlertParam> {
|
||||
|
||||
@Override
|
||||
public Class<Xian3Alert> getHandleableClass() {
|
||||
return Xian3Alert.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<Xian3AlertParam> getUpdateParamClass() {
|
||||
return Xian3AlertParam.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(Xian3Alert alert, Xian3AlertParam param) {
|
||||
|
||||
}
|
||||
}
|
@ -1,27 +0,0 @@
|
||||
package club.joylink.xiannccda.alert.xian3;
|
||||
|
||||
import club.joylink.xiannccda.alert.core.AlertProcessHandler;
|
||||
import club.joylink.xiannccda.alert.xian3.Xian3Alert.Stage;
|
||||
|
||||
public class Xian3AlertOccurredProcessHandler implements AlertProcessHandler<Xian3Alert, Stage, Xian3AlertParam> {
|
||||
|
||||
@Override
|
||||
public Class<Xian3Alert> getAlertClass() {
|
||||
return Xian3Alert.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stage getAlertStage() {
|
||||
return Stage.OCCURRED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<Xian3AlertParam> getParamClass() {
|
||||
return Xian3AlertParam.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(Xian3Alert alert, Stage alertStage, Xian3AlertParam param) {
|
||||
System.out.println("西安三故障发生");
|
||||
}
|
||||
}
|
@ -1,5 +0,0 @@
|
||||
package club.joylink.xiannccda.alert.xian3;
|
||||
|
||||
public class Xian3AlertParam {
|
||||
|
||||
}
|
@ -0,0 +1,32 @@
|
||||
package club.joylink.xiannccda.alert.xian3;
|
||||
|
||||
import club.joylink.xiannccda.alert.core.AlertInfo;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
public class Xian3SupplyShortageAlert implements AlertInfo {
|
||||
private String id;
|
||||
|
||||
public Xian3SupplyShortageAlert(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLevel() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LocalDateTime getAlertTime() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getInfo() {
|
||||
return null;
|
||||
}
|
||||
}
|
@ -7,16 +7,14 @@ import java.time.LocalDateTime;
|
||||
/**
|
||||
* 西安三号线报警
|
||||
*/
|
||||
public class Xian3Alert implements AlertInfo {
|
||||
public class Xian3TrainDelayAlert implements AlertInfo {
|
||||
private String id;
|
||||
private Stage stage;
|
||||
private String level;
|
||||
private LocalDateTime alertTime;
|
||||
private String info;
|
||||
|
||||
public Xian3Alert(String id) {
|
||||
public Xian3TrainDelayAlert(String id) {
|
||||
this.id = id;
|
||||
this.stage = Stage.OCCURRED;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -38,16 +36,4 @@ public class Xian3Alert implements AlertInfo {
|
||||
public String getInfo() {
|
||||
return info;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AlertStage getStage() {
|
||||
return stage;
|
||||
}
|
||||
|
||||
public enum Stage implements AlertStage {
|
||||
OCCURRED,
|
||||
CONFIRM,
|
||||
PROCESSING,
|
||||
RESOLVED,
|
||||
}
|
||||
}
|
58
src/main/java/club/joylink/xiannccda/core/EventEmitter.java
Normal file
58
src/main/java/club/joylink/xiannccda/core/EventEmitter.java
Normal file
@ -0,0 +1,58 @@
|
||||
package club.joylink.xiannccda.core;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import lombok.Getter;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
/**
|
||||
* 事件总线。
|
||||
* <p>
|
||||
* 负责事件的监听器的管理和事件的发布
|
||||
*/
|
||||
public class EventEmitter {
|
||||
|
||||
private Map<Class<?>, List<Listener<?>>> listenerMap = new HashMap<>();
|
||||
|
||||
@Getter
|
||||
private String id;
|
||||
|
||||
public EventEmitter(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public <E> void addListener(Listener<E> listener) {
|
||||
Type genericSuperclass = listener.getClass().getGenericInterfaces()[0];
|
||||
Type type = ((ParameterizedType) genericSuperclass).getActualTypeArguments()[0];
|
||||
Class<?> eventType = (Class<?>) type;
|
||||
List<Listener<?>> listeners = listenerMap.computeIfAbsent(eventType, K -> new ArrayList<>());
|
||||
listeners.add(listener);
|
||||
}
|
||||
|
||||
public <E> void removeListeners(Class<E> eventType) {
|
||||
listenerMap.remove(eventType);
|
||||
}
|
||||
|
||||
public <E> void removeListener(Listener<E> listener) {
|
||||
Type genericSuperclass = listener.getClass().getGenericInterfaces()[0];
|
||||
Type type = ((ParameterizedType) genericSuperclass).getActualTypeArguments()[0];
|
||||
Class<?> eventType = (Class<?>) type;
|
||||
List<Listener<?>> listeners = listenerMap.get(eventType);
|
||||
if (!CollectionUtils.isEmpty(listeners)) {
|
||||
listeners.remove(listener);
|
||||
}
|
||||
}
|
||||
|
||||
public <E> void publish(E event) {
|
||||
List<Listener<?>> listeners = listenerMap.get(event.getClass());
|
||||
if (!CollectionUtils.isEmpty(listeners)) {
|
||||
listeners.forEach(listener -> ((Listener<E>) listener).accept(event));
|
||||
}
|
||||
}
|
||||
}
|
6
src/main/java/club/joylink/xiannccda/core/Listener.java
Normal file
6
src/main/java/club/joylink/xiannccda/core/Listener.java
Normal file
@ -0,0 +1,6 @@
|
||||
package club.joylink.xiannccda.core;
|
||||
|
||||
public interface Listener<E> {
|
||||
|
||||
void accept(E event);
|
||||
}
|
@ -1,5 +1,6 @@
|
||||
package club.joylink.xiannccda.ws;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
@ -22,10 +23,9 @@ public interface IMessageServer {
|
||||
Object onSubscription(String destination, Map<String, String> paramMap);
|
||||
|
||||
/**
|
||||
* 发送消息间隔时间
|
||||
* 发送消息间隔时间,单位ms
|
||||
*
|
||||
* @return 0为不定时发送
|
||||
* @throws IllegalArgumentException 小于0时
|
||||
* @return <=0为不定时发送
|
||||
*/
|
||||
int getInterval();
|
||||
|
||||
@ -34,6 +34,6 @@ public interface IMessageServer {
|
||||
*
|
||||
* @return null为不发送
|
||||
*/
|
||||
Object onTick();
|
||||
List<TopicMessage> onTick();
|
||||
|
||||
}
|
||||
|
@ -6,6 +6,8 @@ import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineMessage;
|
||||
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineMessage.Builder;
|
||||
import club.joylink.xiannccda.exception.BusinessExceptionAssertEnum;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@ -13,6 +15,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
public class TestMessageServer implements IMessageServer {
|
||||
|
||||
static final String LineIdPathKey = "lineId";
|
||||
static final String PathPrefix = "/queue/line";
|
||||
static final String PathPattern = String.format("/queue/line/{%s}", LineIdPathKey);
|
||||
|
||||
@Override
|
||||
@ -37,11 +40,14 @@ public class TestMessageServer implements IMessageServer {
|
||||
|
||||
@Override
|
||||
public int getInterval() {
|
||||
return 0;
|
||||
return 1000;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object onTick() {
|
||||
return null;
|
||||
public List<TopicMessage> onTick() {
|
||||
List<TopicMessage> topicMessages = new ArrayList<>();
|
||||
topicMessages.add(new TopicMessage(String.format("%s/%s", PathPrefix, 1), "线路1数据"));
|
||||
topicMessages.add(new TopicMessage(String.format("%s/%s", PathPrefix, 2), "线路2数据"));
|
||||
return topicMessages;
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,8 @@ import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineMessage;
|
||||
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineMessage.Builder;
|
||||
import club.joylink.xiannccda.exception.BusinessExceptionAssertEnum;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@ -43,11 +45,14 @@ public class TestMessageServer2 implements IMessageServer {
|
||||
|
||||
@Override
|
||||
public int getInterval() {
|
||||
return 0;
|
||||
return 500;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object onTick() {
|
||||
return null;
|
||||
public List<TopicMessage> onTick() {
|
||||
List<TopicMessage> topicMessages = new ArrayList<>();
|
||||
topicMessages.add(
|
||||
new TopicMessage(this.getDestinationPattern(), String.format("线路%s数据", this.lineId)));
|
||||
return topicMessages;
|
||||
}
|
||||
}
|
||||
|
15
src/main/java/club/joylink/xiannccda/ws/TopicMessage.java
Normal file
15
src/main/java/club/joylink/xiannccda/ws/TopicMessage.java
Normal file
@ -0,0 +1,15 @@
|
||||
package club.joylink.xiannccda.ws;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
@Getter
|
||||
public class TopicMessage {
|
||||
|
||||
String destination;
|
||||
Object msg;
|
||||
|
||||
public TopicMessage(String destination, Object msg) {
|
||||
this.destination = destination;
|
||||
this.msg = msg;
|
||||
}
|
||||
}
|
@ -4,6 +4,9 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.messaging.Message;
|
||||
@ -19,6 +22,7 @@ import org.springframework.web.socket.messaging.SessionSubscribeEvent;
|
||||
public class WsMessageServerManager {
|
||||
|
||||
private SimpMessagingTemplate smt;
|
||||
static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
public WsMessageServerManager(SimpMessagingTemplate smt) {
|
||||
this.smt = smt;
|
||||
@ -32,6 +36,16 @@ public class WsMessageServerManager {
|
||||
|
||||
void registerMessageServer(IMessageServer messageServer) {
|
||||
messageServerMap.put(messageServer.getDestinationPattern(), messageServer);
|
||||
if (messageServer.getInterval() > 0) {
|
||||
EXECUTOR_SERVICE.scheduleWithFixedDelay(() -> {
|
||||
List<TopicMessage> topicMessages = messageServer.onTick();
|
||||
for (TopicMessage topicMessage : topicMessages) {
|
||||
if (topicMessage.getMsg() != null) {
|
||||
smt.convertAndSend(topicMessage.destination, topicMessage.msg);
|
||||
}
|
||||
}
|
||||
}, messageServer.getInterval(), messageServer.getInterval(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
String getDestinationFromStompSubscribeMessage(Message<?> message) {
|
||||
@ -47,8 +61,8 @@ public class WsMessageServerManager {
|
||||
@EventListener
|
||||
public void onSubscription(SessionSubscribeEvent subscribeEvent) {
|
||||
String destination = getDestinationFromStompSubscribeMessage(subscribeEvent.getMessage());
|
||||
log.info("订阅事件: destination={}, user={}", destination, Objects.requireNonNull(
|
||||
subscribeEvent.getUser()).getName());
|
||||
// log.info("订阅事件: destination={}, user={}", destination, Objects.requireNonNull(
|
||||
// subscribeEvent.getUser()).getName());
|
||||
|
||||
messageServerMap.forEach((dest, server) -> {
|
||||
if (MATCHER.match(server.getDestinationPattern(), destination)) {
|
||||
|
@ -0,0 +1,90 @@
|
||||
package club.joylink.xiannccda.alert;
|
||||
|
||||
import club.joylink.xiannccda.alert.core.AlertManager;
|
||||
import club.joylink.xiannccda.alert.core.AlertSourceEvent;
|
||||
import club.joylink.xiannccda.alert.core.AlertMonitoringTask;
|
||||
import club.joylink.xiannccda.alert.xian3.Xian3SupplyShortageAlert;
|
||||
import club.joylink.xiannccda.alert.xian3.Xian3TrainDelayAlert;
|
||||
import club.joylink.xiannccda.core.Listener;
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import lombok.Getter;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class AlertManagerTest {
|
||||
|
||||
@Test
|
||||
void testEvent() {
|
||||
String sid = "xian3";
|
||||
AlertManager manager = AlertManager.getInstance(sid);
|
||||
|
||||
//可能报警事件循环监测任务
|
||||
manager.addTask(new TrainDelayAlertMonitoringTask(sid));
|
||||
//可能报警事件监听器
|
||||
manager.addListener(new SuppliesRemainInsufficientListener(sid));
|
||||
//报警事件监听器
|
||||
manager.addListener((Listener<Xian3TrainDelayAlert>) event -> System.out.println("列车延误报警"));
|
||||
manager.addListener((Listener<Xian3SupplyShortageAlert>) event -> System.out.println("物资紧缺报警"));
|
||||
|
||||
manager.taskStart();
|
||||
manager.publish(new SuppliesCountUpdatedEvent(this, 2));
|
||||
}
|
||||
|
||||
static class TrainDelayAlertMonitoringTask implements AlertMonitoringTask {
|
||||
|
||||
String sId;
|
||||
List<String> trains = new ArrayList<>();
|
||||
List<Integer> trips = new ArrayList<>();
|
||||
|
||||
public TrainDelayAlertMonitoringTask(String sId) {
|
||||
this.sId = sId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "Train_Delay";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
AlertManager alertManager = AlertManager.getInstance(sId);
|
||||
alertManager.publish(new Xian3TrainDelayAlert("1"));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static class SuppliesCountUpdatedEvent extends AlertSourceEvent {
|
||||
|
||||
@Getter
|
||||
final int remain;
|
||||
|
||||
public SuppliesCountUpdatedEvent(Object source, int remain) {
|
||||
super(source);
|
||||
this.remain = remain;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 应急物资匮乏报警
|
||||
*/
|
||||
static class SuppliesRemainInsufficientListener implements Listener<SuppliesCountUpdatedEvent> {
|
||||
|
||||
String sId;
|
||||
|
||||
public SuppliesRemainInsufficientListener(String sId) {
|
||||
this.sId = sId;
|
||||
}
|
||||
|
||||
int SuppliesAlertMin = 10;
|
||||
|
||||
@Override
|
||||
public void accept(SuppliesCountUpdatedEvent event) {
|
||||
if (event.getRemain() <= SuppliesAlertMin) {
|
||||
AlertManager.getInstance(sId).publish(new Xian3SupplyShortageAlert("2"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,114 +0,0 @@
|
||||
package club.joylink.xiannccda.alert;
|
||||
|
||||
import club.joylink.xiannccda.alert.core.AlertInfo;
|
||||
import club.joylink.xiannccda.alert.core.EventTaskManager;
|
||||
import club.joylink.xiannccda.alert.core.AlertSourceEvent;
|
||||
import club.joylink.xiannccda.alert.core.AlertSourceEventListener;
|
||||
import club.joylink.xiannccda.alert.core.AlertMonitoringTask;
|
||||
import club.joylink.xiannccda.alert.xian3.Xian3Alert;
|
||||
import club.joylink.xiannccda.alert.xian3.Xian3AlertOccurredProcessHandler;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class EventTaskManagerTest {
|
||||
|
||||
@Test
|
||||
void testEvent() {
|
||||
EventTaskManager manager = EventTaskManager.getInstance("id");
|
||||
List<String> list = List.of("1", "2", "3");
|
||||
|
||||
//报警处理
|
||||
manager.registerAlertHandler(new Xian3AlertOccurredProcessHandler());
|
||||
|
||||
//报警事件监测
|
||||
TrainDelayAlertMonitoringTask task = new TrainDelayAlertMonitoringTask();
|
||||
task.setInfoSupplier(() -> list);
|
||||
task.setCallBack(manager::alertOccurred);
|
||||
manager.registerAlertMonitoringTask(task);
|
||||
|
||||
SuppliesRemainInsufficientAlert.init(); // 注册
|
||||
// 发布,此时监听会相应
|
||||
manager.emit("supplies remain update", new SuppliesCountUpdatedEvent(this, 1));
|
||||
SuppliesRemainInsufficientAlert.clear(); // 移除
|
||||
// 发布,没有监听,不会处理
|
||||
manager.emit("supplies remain update", new SuppliesCountUpdatedEvent(this, 2));
|
||||
|
||||
|
||||
}
|
||||
|
||||
static class TrainDelayAlertMonitoringTask implements AlertMonitoringTask<List<String>> {
|
||||
private Supplier<List<String>> supplier;
|
||||
private Consumer<AlertInfo> callBack;
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "Train_Delay";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setInfoSupplier(Supplier<List<String>> supplier) {
|
||||
this.supplier = supplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Supplier<List<String>> getInfoSupplier() {
|
||||
return supplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCallBack(Consumer<AlertInfo> callBack) {
|
||||
this.callBack = callBack;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Consumer<AlertInfo> getCallBack() {
|
||||
return callBack;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
List<String> info = getInfo();
|
||||
if (info.contains("1")) {
|
||||
getCallBack().accept(new Xian3Alert("xian3"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static class SuppliesCountUpdatedEvent extends AlertSourceEvent {
|
||||
|
||||
final int remain;
|
||||
|
||||
public SuppliesCountUpdatedEvent(Object source, int remain) {
|
||||
super(source);
|
||||
this.remain = remain;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 应急物资匮乏报警
|
||||
*/
|
||||
class SuppliesRemainInsufficientAlert{
|
||||
|
||||
static final int SuppliesAlertMin = 10;
|
||||
static AlertSourceEventListener<SuppliesCountUpdatedEvent> listener;
|
||||
|
||||
static void init() {
|
||||
listener = (AlertSourceEventListener<SuppliesCountUpdatedEvent>) event -> {
|
||||
if (event.remain < SuppliesAlertMin) {
|
||||
// 构建并发布物资匮乏报警
|
||||
System.out.println("物资匮乏报警");
|
||||
}
|
||||
};
|
||||
EventTaskManager.getInstance("id").on("supplies remain update", listener);
|
||||
}
|
||||
|
||||
static void clear() {
|
||||
EventTaskManager.getInstance("id").off("supplies remain update", listener);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user