事件框架v0.2

This commit is contained in:
joylink_zhangsai 2023-06-16 17:51:37 +08:00
parent 98a4ba7571
commit b50c73f30e
18 changed files with 261 additions and 500 deletions

View File

@ -0,0 +1,60 @@
package club.joylink.xiannccda.alert.core;
import club.joylink.xiannccda.core.EventEmitter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 事件任务管理器
*/
public class AlertEventEmitter extends EventEmitter {
private static final Map<String, AlertEventEmitter> MANAGER_MAP = new HashMap<>();
/**
* 报警监控任务(循环监测式报警)
*/
Map<String, AlertMonitoringTask> amtMap = new ConcurrentHashMap<>();
/**
* 任务执行线程池
*/
ScheduledExecutorService Executor = Executors.newScheduledThreadPool(1);
boolean started = false;
final int interval = 100; // 执行器默认间隔单位ms
private AlertEventEmitter(String id) {
super(id);
}
public synchronized static AlertEventEmitter getInstance(String id) {
return MANAGER_MAP.computeIfAbsent(id, k -> new AlertEventEmitter(id));
}
public synchronized static AlertEventEmitter removeInstance(String id) {
return MANAGER_MAP.remove(id);
}
public void addTask(AlertMonitoringTask... tasks) {
if (tasks != null) {
for (AlertMonitoringTask task : tasks) {
amtMap.put(task.getName(), task);
}
}
}
public void removeTask(String name) {
amtMap.remove(name);
}
public synchronized void taskStart() {
if (!started) {
started = true;
Executor.scheduleAtFixedRate(() -> amtMap.values().forEach(AlertMonitoringTask::run),
0, interval, TimeUnit.MILLISECONDS);
}
}
}

View File

@ -1,29 +0,0 @@
package club.joylink.xiannccda.alert.core;
import club.joylink.xiannccda.util.JsonUtils;
import java.util.List;
import java.util.Map;
/**
* 报警处理器
*/
public interface AlertHandler<A extends AlertInfo, P> {
/**
* 获取该处理器可以处理的报警类型
*/
Class<A> getHandleableClass();
Class<P> getUpdateParamClass();
default boolean update(AlertInfo alert, Map<String, Object> param) {
if (alert.getClass() == getHandleableClass()) {
P p = JsonUtils.read(JsonUtils.writeValueAsString(param), getUpdateParamClass());
update(getHandleableClass().cast(alert), p);
return true;
} else {
return false;
}
}
void update(A alert, P param);
}

View File

@ -30,8 +30,4 @@ public interface AlertInfo {
*/
String getInfo();
/**
* 报警当前阶段
*/
AlertStage getStage();
}

View File

@ -6,37 +6,10 @@ import java.util.function.Supplier;
/**
* 报警监测任务
*/
public interface AlertMonitoringTask<D> {
public interface AlertMonitoringTask {
String getName();
/**
* 设置任务所需信息的提供者
*/
void setInfoSupplier(Supplier<D> supplier);
/**
* 获取任务所需信息的提供者
*/
Supplier<D> getInfoSupplier();
/**
* 获取任务所需信息
*/
default D getInfo(){
return getInfoSupplier().get();
};
/**
* 设置满足报警条件时的回调函数
*/
void setCallBack(Consumer<AlertInfo> callBack);
/**
* 获取满足报警条件时的回调函数
*/
Consumer<AlertInfo> getCallBack();
/**
* 任务执行逻辑
*/

View File

@ -1,23 +0,0 @@
package club.joylink.xiannccda.alert.core;
import club.joylink.xiannccda.util.JsonUtils;
import java.util.Map;
public interface AlertProcessHandler <A extends AlertInfo, AP extends AlertStage, P> {
Class<A> getAlertClass();
AP getAlertStage();
Class<P> getParamClass();
default void accept(AlertInfo alertInfo, Map<String, Object> param) {
if (alertInfo.getClass() == getAlertClass()) {
if (param == null) {
accept(getAlertClass().cast(alertInfo), getAlertStage(), null);
} else if (getParamClass() != null){
P p = JsonUtils.read(JsonUtils.writeValueAsString(param), getParamClass());
accept(getAlertClass().cast(alertInfo), getAlertStage(), p);
}
}
}
void accept(A alert, AP alertStage, P param);
}

View File

@ -1,33 +0,0 @@
package club.joylink.xiannccda.alert.core;
import com.google.common.collect.HashBasedTable;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@Service
public class AlertProcessService {
private final HashBasedTable<Class<? extends AlertInfo>, AlertStage, AlertProcessHandler<?, ?, ?>> table = HashBasedTable.create();
@Autowired
public AlertProcessService(List<AlertProcessHandler<?, ?, ?>> handlerList) {
if (!CollectionUtils.isEmpty(handlerList)) {
handlerList.forEach(handler -> {
Class<? extends AlertInfo> alertClass = handler.getAlertClass();
AlertStage alertStage = handler.getAlertStage();
table.put(alertClass, alertStage, handler);
});
}
}
private void update(AlertInfo alertInfo, AlertStage alertStage, Map<String, Object> param) {
AlertProcessHandler<?, ?, ?> handler = table.get(alertInfo.getClass(), alertStage);
if (handler != null) {
handler.accept(alertInfo, param);
}
}
}

View File

@ -1,38 +0,0 @@
package club.joylink.xiannccda.alert.core;
import club.joylink.xiannccda.util.JsonUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@Service
public class AlertService {
private final Map<Class<?>, AlertHandler<?, ?>> handlerMap;
public AlertService(List<AlertHandler<?, ?>> handlerList) {
if (!CollectionUtils.isEmpty(handlerList)) {
this.handlerMap = handlerList.stream()
.collect(Collectors.toMap(AlertHandler::getHandleableClass, Function.identity()));
} else {
handlerMap = new HashMap<>();
}
}
public void update(String sid, String alertId, Map<String, Object> param) {
EventTaskManager alertManager = EventTaskManager.getInstance(sid);
AlertInfo alert = alertManager.getAlert(alertId);
update(alert, param);
}
private void update(AlertInfo alert, Map<String, Object> params) {
AlertHandler<?, ?> handler = handlerMap.get(alert.getClass());
if (handler != null) {
handler.update(alert, params);
}
}
}

View File

@ -1,9 +1,11 @@
package club.joylink.xiannccda.alert.core;
import club.joylink.xiannccda.core.Listener;
/**
* 报警源头事件监听器
*/
public interface AlertSourceEventListener<AE extends AlertSourceEvent> {
public interface AlertSourceEventListener<AE extends AlertSourceEvent> extends Listener<AE> {
void accept(AE event);
}

View File

@ -1,155 +0,0 @@
package club.joylink.xiannccda.alert.core;
import club.joylink.xiannccda.alert.xian3.Xian3Alert;
import club.joylink.xiannccda.alert.xian3.Xian3Alert.Stage;
import com.google.common.collect.HashBasedTable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 事件任务管理器
*/
public class EventTaskManager {
private static final Map<String, EventTaskManager> MANAGER_MAP = new HashMap<>();
private String id;
/**
* 报警主题监听Map(事件触发式报警)
*/
Map<String, List<AlertSourceEventListener>> AlertConsumerMap = new ConcurrentHashMap<>();
/**
* 报警监控任务(循环监测式报警)
*/
List<AlertMonitoringTask> Amts = new ArrayList<>();
Map<String, AlertMonitoringTask> amtMap = new ConcurrentHashMap<>();
/**
* 任务执行线程池
*/
ScheduledExecutorService Executor = Executors.newScheduledThreadPool(1);
boolean started = false;
final int interval = 100; // 执行器默认间隔单位ms
//------------------------- 报警后 -----------------------------------
private Map<String, AlertInfo> alertMap = new HashMap<>();
private final HashBasedTable<Class<? extends AlertInfo>, AlertStage, AlertProcessHandler<?, ?, ?>> handlerTable = HashBasedTable.create();
private EventTaskManager(String id) {
this.id = id;
}
public static EventTaskManager newInstance(String id) {
return MANAGER_MAP.compute(id, (k, v) -> new EventTaskManager(id));
}
public static EventTaskManager getInstance(String id) {
return MANAGER_MAP.computeIfAbsent(id, k -> new EventTaskManager(id));
}
public void registerAlertMonitoringTask(AlertMonitoringTask task) {
Amts.add(task);
taskStart();
}
public void unregisterAlertMonitoringTask(AlertMonitoringTask task) {
Amts.remove(task);
}
synchronized void taskStart() {
if (!started) {
started = true;
Executor.scheduleAtFixedRate(() -> Amts.forEach(AlertMonitoringTask::run),
0, interval, TimeUnit.MILLISECONDS);
}
}
/**
* 获取指定报警主题监听器
*
* @param topic
* @return
*/
public List<AlertSourceEventListener> getAlertConsumers(String topic) {
List<AlertSourceEventListener> list = AlertConsumerMap.computeIfAbsent(
topic, k -> new ArrayList<>());
return list;
}
/**
* 添加指定报警主题监听器
*
* @param topic
* @param consumer
*/
public void on(String topic, AlertSourceEventListener consumer) {
List<AlertSourceEventListener> list = getAlertConsumers(topic);
list.add(consumer);
}
/**
* 移除指定报警主题监听器
*
* @param topic
* @param listener
*/
public void off(String topic, AlertSourceEventListener listener) {
List<AlertSourceEventListener> list = getAlertConsumers(topic);
list.remove(listener);
}
/**
* 调用指定报警主题监听器
*
* @param topic
* @param event
*/
public void emit(String topic, AlertSourceEvent event) {
List<AlertSourceEventListener> list = getAlertConsumers(topic);
list.forEach(consumer -> consumer.accept(event));
}
public void registerAlertHandler(AlertProcessHandler<?, ?, ?> alertProcessHandler) {
AlertStage alertStage = alertProcessHandler.getAlertStage();
handlerTable.put(alertProcessHandler.getAlertClass(), alertStage, alertProcessHandler);
}
public void unregisterAlertHandler(Class<?> alertClass, AlertStage alertStage) {
handlerTable.remove(alertClass, alertStage);
}
public void alertOccurred(AlertInfo alertInfo) {
alertFlow(alertInfo, Stage.OCCURRED, null);
}
/**
* 报警信息流转
*/
public void alertFlow(AlertInfo alert, AlertStage alertStage, Map<String, Object> param) {
AlertProcessHandler<?, ?, ?> handler = handlerTable.get(alert.getClass(),
alertStage);
if (handler != null) {
handler.accept(alert, param);
}
}
public AlertInfo getAlert(String alertId) {
return alertMap.get(alertId);
}
public <E extends AlertInfo> E getAlert(String alertId, Class<E> cls) {
AlertInfo alert = alertMap.get(alertId);
if (cls.isInstance(alert)) {
return cls.cast(alert);
}
throw new RuntimeException();
}
}

View File

@ -1,27 +0,0 @@
package club.joylink.xiannccda.alert.xian3;
import club.joylink.xiannccda.alert.core.AlertHandler;
import java.util.Map;
import org.springframework.stereotype.Component;
/**
* 西安3报警处理器
*/
@Component
public class Xian3AlertHandler implements AlertHandler<Xian3Alert, Xian3AlertParam> {
@Override
public Class<Xian3Alert> getHandleableClass() {
return Xian3Alert.class;
}
@Override
public Class<Xian3AlertParam> getUpdateParamClass() {
return Xian3AlertParam.class;
}
@Override
public void update(Xian3Alert alert, Xian3AlertParam param) {
}
}

View File

@ -1,27 +0,0 @@
package club.joylink.xiannccda.alert.xian3;
import club.joylink.xiannccda.alert.core.AlertProcessHandler;
import club.joylink.xiannccda.alert.xian3.Xian3Alert.Stage;
public class Xian3AlertOccurredProcessHandler implements AlertProcessHandler<Xian3Alert, Stage, Xian3AlertParam> {
@Override
public Class<Xian3Alert> getAlertClass() {
return Xian3Alert.class;
}
@Override
public Stage getAlertStage() {
return Stage.OCCURRED;
}
@Override
public Class<Xian3AlertParam> getParamClass() {
return Xian3AlertParam.class;
}
@Override
public void accept(Xian3Alert alert, Stage alertStage, Xian3AlertParam param) {
System.out.println("西安三故障发生");
}
}

View File

@ -1,5 +0,0 @@
package club.joylink.xiannccda.alert.xian3;
public class Xian3AlertParam {
}

View File

@ -0,0 +1,32 @@
package club.joylink.xiannccda.alert.xian3;
import club.joylink.xiannccda.alert.core.AlertInfo;
import java.time.LocalDateTime;
public class Xian3SupplyShortageAlert implements AlertInfo {
private String id;
public Xian3SupplyShortageAlert(String id) {
this.id = id;
}
@Override
public String getId() {
return id;
}
@Override
public String getLevel() {
return null;
}
@Override
public LocalDateTime getAlertTime() {
return null;
}
@Override
public String getInfo() {
return null;
}
}

View File

@ -7,16 +7,14 @@ import java.time.LocalDateTime;
/**
* 西安三号线报警
*/
public class Xian3Alert implements AlertInfo {
public class Xian3TrainDelayAlert implements AlertInfo {
private String id;
private Stage stage;
private String level;
private LocalDateTime alertTime;
private String info;
public Xian3Alert(String id) {
public Xian3TrainDelayAlert(String id) {
this.id = id;
this.stage = Stage.OCCURRED;
}
@Override
@ -38,16 +36,4 @@ public class Xian3Alert implements AlertInfo {
public String getInfo() {
return info;
}
@Override
public AlertStage getStage() {
return stage;
}
public enum Stage implements AlertStage {
OCCURRED,
CONFIRM,
PROCESSING,
RESOLVED,
}
}

View File

@ -0,0 +1,57 @@
package club.joylink.xiannccda.core;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
import org.springframework.util.CollectionUtils;
/**
* 事件总线
* <p>
* 负责事件的监听器的管理和事件的发布
*/
public class EventEmitter {
private Map<Class<?>, List<Listener<?>>> listenerMap = new HashMap<>();
@Getter
private String id;
public EventEmitter(String id) {
this.id = id;
}
public <E> void addListener(Listener<E> listener) {
for (Method declaredMethod : listener.getClass().getDeclaredMethods()) {
Class<?> eventType = declaredMethod.getParameterTypes()[0];
List<Listener<?>> listeners = listenerMap.computeIfAbsent(eventType, K -> new ArrayList<>());
listeners.add(listener);
}
}
public <E> void removeListeners(Class<E> eventType) {
listenerMap.remove(eventType);
}
public <E> void removeListener(Listener<E> listener) {
for (Method declaredMethod : listener.getClass().getDeclaredMethods()) {
Class<?> eventType = declaredMethod.getParameterTypes()[0];
List<Listener<?>> listeners = listenerMap.get(eventType);
if (!CollectionUtils.isEmpty(listeners)) {
listeners.remove(listener);
}
}
}
public <E> void publish(E event) {
List<Listener<?>> listeners = listenerMap.get(event.getClass());
if (!CollectionUtils.isEmpty(listeners)) {
listeners.forEach(listener -> ((Listener<E>) listener).accept(event));
}
}
}

View File

@ -0,0 +1,6 @@
package club.joylink.xiannccda.core;
public interface Listener<E> {
void accept(E event);
}

View File

@ -0,0 +1,100 @@
package club.joylink.xiannccda.alert;
import club.joylink.xiannccda.alert.core.AlertEventEmitter;
import club.joylink.xiannccda.alert.core.AlertSourceEvent;
import club.joylink.xiannccda.alert.core.AlertSourceEventListener;
import club.joylink.xiannccda.alert.core.AlertMonitoringTask;
import club.joylink.xiannccda.alert.xian3.Xian3SupplyShortageAlert;
import club.joylink.xiannccda.alert.xian3.Xian3TrainDelayAlert;
import club.joylink.xiannccda.core.Listener;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import org.junit.jupiter.api.Test;
public class AlertEventEmitterTest {
@Test
void testEvent() {
String sid = "xian3";
AlertEventEmitter manager = AlertEventEmitter.getInstance(sid);
//可能报警事件监测
manager.addTask(new TrainDelayAlertMonitoringTask(sid));
//可能报警时间监听
manager.addListener(new SuppliesRemainInsufficientListener(sid));
//报警事件监听
manager.addListener(new Listener<Xian3TrainDelayAlert>() {
@Override
public void accept(Xian3TrainDelayAlert event) {
System.out.println("列车延误报警");
}
});
manager.addListener(new Listener<Xian3SupplyShortageAlert>() {
@Override
public void accept(Xian3SupplyShortageAlert event) {
System.out.println("物资紧缺报警");
}
});
manager.taskStart();
manager.publish(new SuppliesCountUpdatedEvent(this, 2));
}
static class TrainDelayAlertMonitoringTask implements AlertMonitoringTask {
String sId;
List<String> trains = new ArrayList<>();
List<Integer> trips = new ArrayList<>();
public TrainDelayAlertMonitoringTask(String sId) {
this.sId = sId;
}
@Override
public String getName() {
return "Train_Delay";
}
@Override
public void run() {
AlertEventEmitter alertEventBus = AlertEventEmitter.getInstance(sId);
alertEventBus.publish(new Xian3TrainDelayAlert("1"));
}
}
static class SuppliesCountUpdatedEvent extends AlertSourceEvent {
@Getter
final int remain;
public SuppliesCountUpdatedEvent(Object source, int remain) {
super(source);
this.remain = remain;
}
}
/**
* 应急物资匮乏报警
*/
static class SuppliesRemainInsufficientListener implements Listener<SuppliesCountUpdatedEvent>{
String sId;
public SuppliesRemainInsufficientListener(String sId) {
this.sId = sId;
}
int SuppliesAlertMin = 10;
@Override
public void accept(SuppliesCountUpdatedEvent event) {
if (event.getRemain() <= SuppliesAlertMin) {
AlertEventEmitter.getInstance(sId).publish(new Xian3SupplyShortageAlert("2"));
}
}
}
}

View File

@ -1,114 +0,0 @@
package club.joylink.xiannccda.alert;
import club.joylink.xiannccda.alert.core.AlertInfo;
import club.joylink.xiannccda.alert.core.EventTaskManager;
import club.joylink.xiannccda.alert.core.AlertSourceEvent;
import club.joylink.xiannccda.alert.core.AlertSourceEventListener;
import club.joylink.xiannccda.alert.core.AlertMonitoringTask;
import club.joylink.xiannccda.alert.xian3.Xian3Alert;
import club.joylink.xiannccda.alert.xian3.Xian3AlertOccurredProcessHandler;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.junit.jupiter.api.Test;
public class EventTaskManagerTest {
@Test
void testEvent() {
EventTaskManager manager = EventTaskManager.getInstance("id");
List<String> list = List.of("1", "2", "3");
//报警处理
manager.registerAlertHandler(new Xian3AlertOccurredProcessHandler());
//报警事件监测
TrainDelayAlertMonitoringTask task = new TrainDelayAlertMonitoringTask();
task.setInfoSupplier(() -> list);
task.setCallBack(manager::alertOccurred);
manager.registerAlertMonitoringTask(task);
SuppliesRemainInsufficientAlert.init(); // 注册
// 发布此时监听会相应
manager.emit("supplies remain update", new SuppliesCountUpdatedEvent(this, 1));
SuppliesRemainInsufficientAlert.clear(); // 移除
// 发布没有监听不会处理
manager.emit("supplies remain update", new SuppliesCountUpdatedEvent(this, 2));
}
static class TrainDelayAlertMonitoringTask implements AlertMonitoringTask<List<String>> {
private Supplier<List<String>> supplier;
private Consumer<AlertInfo> callBack;
@Override
public String getName() {
return "Train_Delay";
}
@Override
public void setInfoSupplier(Supplier<List<String>> supplier) {
this.supplier = supplier;
}
@Override
public Supplier<List<String>> getInfoSupplier() {
return supplier;
}
@Override
public void setCallBack(Consumer<AlertInfo> callBack) {
this.callBack = callBack;
}
@Override
public Consumer<AlertInfo> getCallBack() {
return callBack;
}
@Override
public void run() {
List<String> info = getInfo();
if (info.contains("1")) {
getCallBack().accept(new Xian3Alert("xian3"));
}
}
}
static class SuppliesCountUpdatedEvent extends AlertSourceEvent {
final int remain;
public SuppliesCountUpdatedEvent(Object source, int remain) {
super(source);
this.remain = remain;
}
}
/**
* 应急物资匮乏报警
*/
class SuppliesRemainInsufficientAlert{
static final int SuppliesAlertMin = 10;
static AlertSourceEventListener<SuppliesCountUpdatedEvent> listener;
static void init() {
listener = (AlertSourceEventListener<SuppliesCountUpdatedEvent>) event -> {
if (event.remain < SuppliesAlertMin) {
// 构建并发布物资匮乏报警
System.out.println("物资匮乏报警");
}
};
EventTaskManager.getInstance("id").on("supplies remain update", listener);
}
static void clear() {
EventTaskManager.getInstance("id").off("supplies remain update", listener);
}
}
}