From bcb50b83813511e21b8eeff8cb9bb1501515b6c4 Mon Sep 17 00:00:00 2001 From: weizhihong Date: Wed, 21 Jun 2023 14:22:21 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E8=AE=BE=E5=A4=87=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E8=BD=AC=E6=8D=A2=E3=80=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../collect/DeviceStatusConvertor.java | 18 +++++++ .../collect/DeviceStatusConvertorManager.java | 53 +++++++++++++++++++ .../ats/message/collect/DeviceStatusData.java | 3 +- .../collect/DeviceStatusDataOperate.java | 34 +++++------- .../joylink/xiannccda/event/EventEmitter.java | 7 ++- .../service/LineDeviceStatusService.java | 1 - .../xiannccda/ws/LineNetMessageServer.java | 35 ++++++------ xian-ncc-da-message | 2 +- 8 files changed, 108 insertions(+), 45 deletions(-) create mode 100644 src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusConvertor.java create mode 100644 src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusConvertorManager.java diff --git a/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusConvertor.java b/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusConvertor.java new file mode 100644 index 0000000..8b5ea70 --- /dev/null +++ b/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusConvertor.java @@ -0,0 +1,18 @@ +package club.joylink.xiannccda.ats.message.collect; + +import club.joylink.xiannccda.ats.message.MessageData; +import java.util.List; + +/** 消息数据额外转换时间,比如:线路的数据转换为线网数据 */ +public abstract class DeviceStatusConvertor { + + /** + * 获取转换ID + * + * @return 转换ID + */ + abstract String getId(); + + /** 任务执行逻辑 */ + abstract void run(List messageData); +} diff --git a/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusConvertorManager.java b/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusConvertorManager.java new file mode 100644 index 0000000..05a16b5 --- /dev/null +++ b/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusConvertorManager.java @@ -0,0 +1,53 @@ +package club.joylink.xiannccda.ats.message.collect; + +import club.joylink.xiannccda.ats.message.MessageData; +import club.joylink.xiannccda.ats.message.MessageId; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** 消息与转换时间管理 */ +public class DeviceStatusConvertorManager { + + /** 消息类型,与转换列表 */ + private static final Map> CONVERTOR_MAP = + new ConcurrentHashMap<>(); + + /** + * 添加转换逻辑程序 + * + * @param msgId 消息ID + * @param convertor 转换对象 + */ + public static void addStatusConvertor(MessageId msgId, DeviceStatusConvertor convertor) { + Map convertorMap = + CONVERTOR_MAP.computeIfAbsent(msgId, k -> new ConcurrentHashMap<>()); + convertorMap.put(convertor.getId(), convertor); + } + + /** + * 移除转换逻辑 + * + * @param msgId 消息ID + * @param id 装换ID + */ + public static void removeStatusConvertor(MessageId msgId, String id) { + if (CONVERTOR_MAP.containsKey(msgId)) { + Map convertorMap = CONVERTOR_MAP.get(msgId); + convertorMap.remove(id); + } + } + + /** + * 转换数据消息 + * + * @param dataList 消息数据列表 + */ + public static void doConvertor(List dataList) { + dataList.stream() + .filter(m -> CONVERTOR_MAP.containsKey(m.getMsgId())) + .collect(Collectors.groupingBy(MessageData::getMsgId)) + .forEach((k, v) -> CONVERTOR_MAP.get(k).values().forEach(c -> c.run(v))); + } +} diff --git a/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusData.java b/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusData.java index dfa2389..8bd9444 100644 --- a/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusData.java +++ b/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusData.java @@ -2,7 +2,6 @@ package club.joylink.xiannccda.ats.message.collect; import com.google.protobuf.GeneratedMessageV3.Builder; import com.google.protobuf.Message; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import lombok.Getter; @@ -18,7 +17,7 @@ public class DeviceStatusData { private Map> allDeviceMap = new ConcurrentHashMap<>(); /** 增量的设备更新信息 */ - private Map> statusVOMap = new ConcurrentHashMap<>(); + private Map> statusVOMap = new ConcurrentHashMap<>(); public DeviceStatusData(String lineCode) { this.lineCode = lineCode; diff --git a/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusDataOperate.java b/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusDataOperate.java index 95b6914..2a9c773 100644 --- a/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusDataOperate.java +++ b/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusDataOperate.java @@ -2,6 +2,7 @@ package club.joylink.xiannccda.ats.message.collect; import com.google.protobuf.GeneratedMessageV3.Builder; import com.google.protobuf.Message; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -54,7 +55,7 @@ public abstract class DeviceStatusDataOperate { // 获取全量数据 Map> allDeviceMap = data.getAllDeviceMap(); // 最后增量数据 - Map> statusVOMap = new ConcurrentHashMap<>(); + Map> statusVOMap = new ConcurrentHashMap<>(); builders.stream() .collect(Collectors.groupingBy(DeviceStatusDataOperate::findType)) .forEach( @@ -66,27 +67,14 @@ public abstract class DeviceStatusDataOperate { Map newDeviceMap = v.stream().collect(Collectors.toMap(DeviceStatusDataOperate::getIdVal, b -> b)); // 对比结果 - List compareList = compare(deviceStatusMap, newDeviceMap); - if (!CollectionUtils.isEmpty(compareList)) { - statusVOMap.put(k, compareList); + Map compareMap = compare(deviceStatusMap, newDeviceMap); + if (!CollectionUtils.isEmpty(compareMap)) { + statusVOMap.put(k, compareMap); } }); data.getStatusVOMap().putAll(statusVOMap); } - /** - * 清除线路里的所有设备数据 - * - * @param data 线路数据 - */ - public static void resetDevice(DeviceStatusData data) { - if (data == null) { - return; - } - data.getAllDeviceMap().clear(); - data.getStatusVOMap().clear(); - } - /** * 对比获取设备变化了的状态信息并更新状态信息 * @@ -94,23 +82,25 @@ public abstract class DeviceStatusDataOperate { * @param newMap 采集到状态信息 * @return 发生变化的状态信息 */ - public static List compare(Map curMap, Map newMap) { - List messageList = new LinkedList<>(); + private static Map compare( + Map curMap, Map newMap) { if (newMap != null) { + Map messageMap = new HashMap<>(newMap.size()); newMap.forEach( (k, v) -> { if (curMap.containsKey(k)) { // 包含状态,对比状态是否一致 Message message = v.build(); if (!curMap.get(k).build().equals(message)) { curMap.get(k).mergeFrom(message); - messageList.add(message); + messageMap.put(k, message); } } else { // 不包含直接添加状态 curMap.put(k, v.clone()); - messageList.add(curMap.get(k).build()); + messageMap.put(k, curMap.get(k).build()); } }); + return messageMap; } - return messageList; + return Map.of(); } } diff --git a/src/main/java/club/joylink/xiannccda/event/EventEmitter.java b/src/main/java/club/joylink/xiannccda/event/EventEmitter.java index 47d5bde..403bc70 100644 --- a/src/main/java/club/joylink/xiannccda/event/EventEmitter.java +++ b/src/main/java/club/joylink/xiannccda/event/EventEmitter.java @@ -11,15 +11,14 @@ import org.springframework.util.CollectionUtils; /** * 事件总线。 - *

- * 负责事件的监听器的管理和事件的发布 + * + *

负责事件的监听器的管理和事件的发布 */ public class EventEmitter { private Map, List>> listenerMap = new HashMap<>(); - @Getter - private String id; + @Getter private String id; public EventEmitter(String id) { this.id = id; diff --git a/src/main/java/club/joylink/xiannccda/service/LineDeviceStatusService.java b/src/main/java/club/joylink/xiannccda/service/LineDeviceStatusService.java index 17fa220..068c279 100644 --- a/src/main/java/club/joylink/xiannccda/service/LineDeviceStatusService.java +++ b/src/main/java/club/joylink/xiannccda/service/LineDeviceStatusService.java @@ -159,7 +159,6 @@ public class LineDeviceStatusService { offsetMap.put(k, 0); // 位置归零 v.setDirection(v.getDirection() == 0 ? 1 : 0); // 方向转换 } else { - log.info(k + "路径:" + v.getDevName() + "->" + path.get(offsetMap.get(k))); v.setDevName(path.get(offsetMap.get(k))); // 更新到下一个位置 v.setDestinationId(path.size() - 1); offsetMap.put(k, offsetMap.get(k) + 1); diff --git a/src/main/java/club/joylink/xiannccda/ws/LineNetMessageServer.java b/src/main/java/club/joylink/xiannccda/ws/LineNetMessageServer.java index ed87205..165ff6e 100644 --- a/src/main/java/club/joylink/xiannccda/ws/LineNetMessageServer.java +++ b/src/main/java/club/joylink/xiannccda/ws/LineNetMessageServer.java @@ -6,6 +6,7 @@ import com.google.protobuf.Descriptors.FieldDescriptor.Type; import com.google.protobuf.Message; import com.google.protobuf.GeneratedMessageV3.Builder; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -35,9 +36,11 @@ public class LineNetMessageServer implements IMessageServer { (fieldType) -> { Map deviceMap = dataSource.getAllDeviceMap().get(fieldType); if (!CollectionUtils.isEmpty(deviceMap)) { // 如果存在该类型状态,则全部放入 - return deviceMap.values().stream().map(Builder::build).toList(); + Map messageMap = new HashMap<>(deviceMap.size()); + deviceMap.forEach((k, v) -> messageMap.put(k, v.build())); + return messageMap; } - return List.of(); + return Map.of(); }); byte[] bytes = message.toByteArray(); log.info("【线网订阅】全量数据量:" + bytes.length); @@ -46,20 +49,22 @@ public class LineNetMessageServer implements IMessageServer { @Override public int getInterval() { - return 2000; + return 1000; } @Override public List onTick() { List topicMessages = new ArrayList<>(); - WsLineNetMessage message = - commonCollectFunction( - (fieldType) -> dataSource.getStatusVOMap().getOrDefault(fieldType, List.of())); - byte[] bytes = message.toByteArray(); - // 清空增量 - dataSource.getStatusVOMap().clear(); - log.info("【线网订阅】数据状态变化量:" + bytes.length); - topicMessages.add(new TopicMessage(this.getDestinationPattern(), bytes)); + if (!dataSource.getStatusVOMap().isEmpty()) { + WsLineNetMessage message = + commonCollectFunction( + (fieldType) -> dataSource.getStatusVOMap().getOrDefault(fieldType, Map.of())); + byte[] bytes = message.toByteArray(); + // 清空增量 + dataSource.getStatusVOMap().clear(); + log.info("【线网订阅】数据状态变化量:" + bytes.length); + topicMessages.add(new TopicMessage(this.getDestinationPattern(), bytes)); + } return topicMessages; } @@ -69,7 +74,7 @@ public class LineNetMessageServer implements IMessageServer { * @param compareFun 额外的处理逻辑 * @return 消息内容 */ - public WsLineNetMessage commonCollectFunction(Function> compareFun) { + public WsLineNetMessage commonCollectFunction(Function> compareFun) { WsLineNetMessage.Builder builder = WsLineNetMessage.newBuilder(); // 消息体字段列表 builder.getDescriptorForType().getFields().stream() @@ -77,9 +82,9 @@ public class LineNetMessageServer implements IMessageServer { .forEach( field -> { String fieldType = field.getMessageType().getName(); // 字段类型 - List allDeviceList = compareFun.apply(fieldType); - if (!CollectionUtils.isEmpty(allDeviceList)) { - builder.setField(field, allDeviceList); + Map allDeviceMap = compareFun.apply(fieldType); + if (!CollectionUtils.isEmpty(allDeviceMap)) { + builder.setField(field, new ArrayList<>(allDeviceMap.values())); } }); return builder.build(); diff --git a/xian-ncc-da-message b/xian-ncc-da-message index a305eb8..3bd4f02 160000 --- a/xian-ncc-da-message +++ b/xian-ncc-da-message @@ -1 +1 @@ -Subproject commit a305eb80720363713dfbb6d732d21f1fbd7e61b3 +Subproject commit 3bd4f023cd1810dd1ec9611c3714d626a08377d2