From b9478a544f80e8756d849179f68e2109de97c65a Mon Sep 17 00:00:00 2001 From: weizhihong Date: Tue, 20 Jun 2023 11:11:31 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E5=88=A0=E9=99=A4=E5=AF=B9=E6=AF=94?= =?UTF-8?q?=E9=80=BB=E8=BE=91=E3=80=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../message/collect/DeviceStatusCollect.java | 119 ------------------ .../ats/message/collect/DeviceStatusData.java | 6 +- .../collect/DeviceStatusDataOperate.java | 73 +++++++---- .../collect/DeviceStatusDataRepository.java | 65 ---------- .../xiannccda/ws/LineNetMessageServer.java | 6 +- .../xiannccda/ws/TestMessageServer.java | 7 +- .../xiannccda/ws/TestMessageServer2.java | 7 +- 7 files changed, 66 insertions(+), 217 deletions(-) delete mode 100644 src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusCollect.java diff --git a/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusCollect.java b/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusCollect.java deleted file mode 100644 index f6859d1..0000000 --- a/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusCollect.java +++ /dev/null @@ -1,119 +0,0 @@ -package club.joylink.xiannccda.ats.message.collect; - -import com.google.protobuf.Descriptors.FieldDescriptor.Type; -import com.google.protobuf.GeneratedMessageV3.Builder; -import com.google.protobuf.Message; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.function.BiFunction; -import org.springframework.util.CollectionUtils; - -/** 收集状态方法 */ -public abstract class DeviceStatusCollect { - /** - * 返回全量设备状态信息 - * - * @param builder 返回的消息类型 - * @param lineDataList 要处理的线路数据列表 - * @return 状态消息 - */ - public static Message collectAllStatus(Builder builder, List lineDataList) { - return commonCollectFunction( - builder, - (data, fieldType) -> { - Map> allDevice = data.getAllDeviceMap(); - Map deviceMap = allDevice.get(fieldType); - if (!CollectionUtils.isEmpty(deviceMap)) { // 如果存在该类型状态,则全部放入 - return deviceMap.values().stream().map(Builder::build).toList(); - } - return List.of(); - }, - lineDataList); - } - - /** - * 返回增量设备状态信息 - * - * @param lineDataList 线路数据列表 - * @return 状态信息 - */ - public static Message collectIncrementStatus( - Builder builder, List lineDataList) { - return commonCollectFunction( - builder, - (data, fieldType) -> { - Map> allDevice = data.getAllDeviceMap(); - Map> statusDevice = data.getStatusVOMap(); - Map deviceMap = allDevice.get(fieldType); - if (!CollectionUtils.isEmpty(deviceMap)) { // 如果存在该类型状态,则全部放入 - // 获取当前存储的状态 - Map statusMap = - statusDevice.computeIfAbsent(fieldType, key -> new HashMap<>(deviceMap.size())); - return compare(statusMap, deviceMap); - } - return List.of(); - }, - lineDataList); - } - - /** - * 对比获取设备变化了的状态信息并更新状态信息 - * - * @param curMap 原有的状态信息 - * @param newMap 采集到状态信息 - * @return 发生变化的状态信息 - */ - public static List compare(Map curMap, Map newMap) { - List messageList = new LinkedList<>(); - if (newMap != null) { - newMap.forEach( - (k, v) -> { - if (curMap.containsKey(k)) { // 包含状态,对比状态是否一致 - Message message = v.build(); - if (!curMap.get(k).build().equals(message)) { - curMap.get(k).mergeFrom(message); - messageList.add(message); - } - } else { // 不包含直接添加状态 - curMap.put(k, v.clone()); - messageList.add(curMap.get(k).build()); - } - }); - } - return messageList; - } - - /** - * 公共处理部分 - * - * @param builder 返回的builder对象 - * @param compareFun 额外的处理逻辑 - * @param lineDataList 线路数据 - * @return 消息内容 - */ - private static Message commonCollectFunction( - Builder builder, - BiFunction> compareFun, - List lineDataList) { - if (lineDataList == null || lineDataList.size() == 0) { - return builder.build(); - } - // 消息体字段列表 - builder.getDescriptorForType().getFields().stream() - .filter(f -> f.getType().equals(Type.MESSAGE)) - .forEach( - field -> { - String fieldType = field.getMessageType().getName(); // 字段类型 - List allDeviceList = new LinkedList<>(); - for (DeviceStatusData data : lineDataList) { // 循环传入的线路数据 - allDeviceList.addAll(compareFun.apply(data, fieldType)); - } - if (!CollectionUtils.isEmpty(allDeviceList)) { - builder.setField(field, allDeviceList); - } - }); - return builder.build(); - } -} diff --git a/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusData.java b/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusData.java index 5f59962..dfa2389 100644 --- a/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusData.java +++ b/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusData.java @@ -1,6 +1,8 @@ package club.joylink.xiannccda.ats.message.collect; import com.google.protobuf.GeneratedMessageV3.Builder; +import com.google.protobuf.Message; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import lombok.Getter; @@ -15,8 +17,8 @@ public class DeviceStatusData { /** 所有设备状态信息 <设备类型【这里用的message的类名:Rtu、Signal等】,<设备主键,设备信息>> */ private Map> allDeviceMap = new ConcurrentHashMap<>(); - /** 当前所有设备状态信息 */ - private Map> statusVOMap = new ConcurrentHashMap<>(); + /** 增量的设备更新信息 */ + private Map> statusVOMap = new ConcurrentHashMap<>(); public DeviceStatusData(String lineCode) { this.lineCode = lineCode; diff --git a/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusDataOperate.java b/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusDataOperate.java index 337f448..4071261 100644 --- a/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusDataOperate.java +++ b/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusDataOperate.java @@ -1,10 +1,13 @@ package club.joylink.xiannccda.ats.message.collect; import com.google.protobuf.GeneratedMessageV3.Builder; +import com.google.protobuf.Message; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import org.springframework.util.CollectionUtils; /** 线路设备操作方法 */ public abstract class DeviceStatusDataOperate { @@ -22,7 +25,7 @@ public abstract class DeviceStatusDataOperate { * @param builder 设备状态信息 * @return 状态类型 */ - public static String findType(Builder builder) { + private static String findType(Builder builder) { return builder.getDefaultInstanceForType().getClass().getSimpleName(); } @@ -32,7 +35,7 @@ public abstract class DeviceStatusDataOperate { * @param protoBuilder 数据信息 * @return 主键 */ - public static String getIdVal(Builder protoBuilder) { + private static String getIdVal(Builder protoBuilder) { String idName = DEVICE_ID_NAME_MAP.getOrDefault(findType(protoBuilder), DEFAULT_ID_NAME); return String.valueOf( protoBuilder.getField(protoBuilder.getDescriptorForType().findFieldByName(idName))); @@ -48,38 +51,29 @@ public abstract class DeviceStatusDataOperate { if (data == null || builders == null) { return; } - Map> map = data.getAllDeviceMap(); + // 获取全量数据 + Map> allDeviceMap = data.getAllDeviceMap(); + // 最后增量数据 + Map> statusVOMap = new ConcurrentHashMap<>(); builders.stream() .collect(Collectors.groupingBy(DeviceStatusDataOperate::findType)) .forEach( (k, v) -> { + // 当前的设备状态 Map deviceStatusMap = - map.computeIfAbsent(k, key -> new ConcurrentHashMap<>(v.size())); + allDeviceMap.computeIfAbsent(k, key -> new ConcurrentHashMap<>(v.size())); + // 新的设备状态 Map newDeviceMap = v.stream().collect(Collectors.toMap(DeviceStatusDataOperate::getIdVal, b -> b)); - deviceStatusMap.putAll(newDeviceMap); + // 对比结果 + List compareList = compare(deviceStatusMap, newDeviceMap); + if (CollectionUtils.isEmpty(compareList)) { + statusVOMap.put(k, compareList); + } }); - } - - /** - * 将采集到的设备状态放入内存中 - * - * @param builder 设备状态信息 - * @param data 线路的设备信息 - * @return 包装的设备状态 - */ - public static void addDevice(Builder builder, DeviceStatusData data) { - if (data == null || builder == null) { - return; - } - Map> map = data.getAllDeviceMap(); - String buildType = findType(builder); - Map deviceStatusMap = map.get(buildType); - if (deviceStatusMap == null) { - deviceStatusMap = new ConcurrentHashMap<>(); - map.put(buildType, deviceStatusMap); - } - deviceStatusMap.put(getIdVal(builder), builder); + // 清空上次的增量数据 + data.getStatusVOMap().clear(); + data.getStatusVOMap().putAll(statusVOMap); } /** @@ -94,4 +88,31 @@ public abstract class DeviceStatusDataOperate { data.getAllDeviceMap().clear(); data.getStatusVOMap().clear(); } + + /** + * 对比获取设备变化了的状态信息并更新状态信息 + * + * @param curMap 原有的状态信息 + * @param newMap 采集到状态信息 + * @return 发生变化的状态信息 + */ + public static List compare(Map curMap, Map newMap) { + List messageList = new LinkedList<>(); + if (newMap != null) { + newMap.forEach( + (k, v) -> { + if (curMap.containsKey(k)) { // 包含状态,对比状态是否一致 + Message message = v.build(); + if (!curMap.get(k).build().equals(message)) { + curMap.get(k).mergeFrom(message); + messageList.add(message); + } + } else { // 不包含直接添加状态 + curMap.put(k, v.clone()); + messageList.add(curMap.get(k).build()); + } + }); + } + return messageList; + } } diff --git a/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusDataRepository.java b/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusDataRepository.java index a255dd5..a842c5c 100644 --- a/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusDataRepository.java +++ b/src/main/java/club/joylink/xiannccda/ats/message/collect/DeviceStatusDataRepository.java @@ -1,9 +1,6 @@ package club.joylink.xiannccda.ats.message.collect; -import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineMessage; -import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineNetMessage; import com.google.protobuf.GeneratedMessageV3.Builder; -import com.google.protobuf.Message; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -39,68 +36,6 @@ public abstract class DeviceStatusDataRepository { DeviceStatusDataOperate.addDevices(builders, data); } - /** - * 添加单个设备数据 - * - * @param lineCode 线路编号 - * @param builder 设备消息 - */ - public static void addDeviceStatusData(String lineCode, Builder builder) { - DeviceStatusData data = lineStatusDataMap.computeIfAbsent(lineCode, DeviceStatusData::new); - DeviceStatusDataOperate.addDevice(builder, data); - } - - /** - * 采集线路状态数据 - * - * @param lineCode 线路编码 - * @return 消息数据 - */ - public static Message collectLineAllMessage(String lineCode) { - return DeviceStatusCollect.collectAllStatus( - WsLineMessage.newBuilder(), List.of(lineStatusDataMap.get(lineCode))); - } - - /** - * 采集增量线路数据 - * - * @param lineCode 线路编码 - * @return 消息数据 - */ - public static Message collectLineMessage(String lineCode) { - return DeviceStatusCollect.collectIncrementStatus( - WsLineMessage.newBuilder(), List.of(lineStatusDataMap.get(lineCode))); - } - - /** - * 发送线网消息数据 - * - * @return 消息数据 - */ - public static Message collectLineNetAllMessage() { - return DeviceStatusCollect.collectAllStatus( - WsLineNetMessage.newBuilder(), lineStatusDataMap.values().stream().toList()); - } - - /** - * 发送线网增量消息数据 - * - * @return 消息数据 - */ - public static Message collectLineNetMessage() { - return DeviceStatusCollect.collectIncrementStatus( - WsLineNetMessage.newBuilder(), lineStatusDataMap.values().stream().toList()); - } - - /** - * 获取现在存在的线路编码列表 - * - * @return 线路编码列表 - */ - public static List getLineCodeList() { - return lineStatusDataMap.keySet().stream().toList(); - } - /** * 获取线路总数据 * diff --git a/src/main/java/club/joylink/xiannccda/ws/LineNetMessageServer.java b/src/main/java/club/joylink/xiannccda/ws/LineNetMessageServer.java index b6271fa..bfa022b 100644 --- a/src/main/java/club/joylink/xiannccda/ws/LineNetMessageServer.java +++ b/src/main/java/club/joylink/xiannccda/ws/LineNetMessageServer.java @@ -12,13 +12,13 @@ public class LineNetMessageServer implements IMessageServer { @Override public String getDestinationPattern() { - return "/queue/line/all"; + return "/queue/lineNet"; } @Override public Object onSubscription(String destination, Map paramMap) { log.info("【线网订阅】发布全量数据"); - byte[] bytes = DeviceStatusDataRepository.collectLineNetAllMessage().toByteArray(); + byte[] bytes = new byte[1]; log.info("【线网订阅】全量数据量:" + bytes.length); return bytes; } @@ -31,7 +31,7 @@ public class LineNetMessageServer implements IMessageServer { @Override public List onTick() { List topicMessages = new ArrayList<>(); - byte[] bytes = DeviceStatusDataRepository.collectLineNetMessage().toByteArray(); + byte[] bytes = new byte[1]; log.info("【线网订阅】数据状态变化量:" + bytes.length); topicMessages.add(new TopicMessage(this.getDestinationPattern(), bytes)); return topicMessages; diff --git a/src/main/java/club/joylink/xiannccda/ws/TestMessageServer.java b/src/main/java/club/joylink/xiannccda/ws/TestMessageServer.java index c4b7225..ea4c566 100644 --- a/src/main/java/club/joylink/xiannccda/ws/TestMessageServer.java +++ b/src/main/java/club/joylink/xiannccda/ws/TestMessageServer.java @@ -29,7 +29,12 @@ public class TestMessageServer implements IMessageServer { String lineId = paramMap.get(LineIdPathKey); BusinessExceptionAssertEnum.ARGUMENT_ILLEGAL.assertNotNull(lineId); log.info("线路lineId={}订阅,发布全量数据", lineId); - byte[] bytes = DeviceStatusDataRepository.collectLineAllMessage(lineId).toByteArray(); + Builder builder = WsLineMessage.newBuilder(); + builder.addRtu(Rtu.newBuilder().setId("rtu1").setIpRtuStusInCentralCtrl(true).build()); + builder.addSignal( + Signal.newBuilder().setId("signal1").setRedOpen(true).setAutoMode(true).build()); + + byte[] bytes = builder.build().toByteArray(); System.out.println(JSON.toJSONString(bytes)); return bytes; } diff --git a/src/main/java/club/joylink/xiannccda/ws/TestMessageServer2.java b/src/main/java/club/joylink/xiannccda/ws/TestMessageServer2.java index 1ab1920..7a05227 100644 --- a/src/main/java/club/joylink/xiannccda/ws/TestMessageServer2.java +++ b/src/main/java/club/joylink/xiannccda/ws/TestMessageServer2.java @@ -34,7 +34,12 @@ public class TestMessageServer2 implements IMessageServer { @Override public Object onSubscription(String destination, Map paramMap) { log.info("线路lineId={}订阅,发布全量数据", lineId); - byte[] bytes = DeviceStatusDataRepository.collectLineAllMessage(lineId).toByteArray(); + Builder builder = WsLineMessage.newBuilder(); + builder.addRtu(Rtu.newBuilder().setId("rtu1").setIpRtuStusInCentralCtrl(true).build()); + builder.addSignal( + Signal.newBuilder().setId("signal1").setRedOpen(true).setAutoMode(true).build()); + + byte[] bytes = builder.build().toByteArray(); System.out.println(JSON.toJSONString(bytes)); return bytes; }