【删除对比逻辑】

This commit is contained in:
weizhihong 2023-06-20 11:11:31 +08:00
parent 48078e2186
commit b9478a544f
7 changed files with 66 additions and 217 deletions

View File

@ -1,119 +0,0 @@
package club.joylink.xiannccda.ats.message.collect;
import com.google.protobuf.Descriptors.FieldDescriptor.Type;
import com.google.protobuf.GeneratedMessageV3.Builder;
import com.google.protobuf.Message;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import org.springframework.util.CollectionUtils;
/** 收集状态方法 */
public abstract class DeviceStatusCollect {
/**
* 返回全量设备状态信息
*
* @param builder 返回的消息类型
* @param lineDataList 要处理的线路数据列表
* @return 状态消息
*/
public static Message collectAllStatus(Builder builder, List<DeviceStatusData> lineDataList) {
return commonCollectFunction(
builder,
(data, fieldType) -> {
Map<String, Map<String, Builder>> allDevice = data.getAllDeviceMap();
Map<String, Builder> deviceMap = allDevice.get(fieldType);
if (!CollectionUtils.isEmpty(deviceMap)) { // 如果存在该类型状态则全部放入
return deviceMap.values().stream().map(Builder::build).toList();
}
return List.of();
},
lineDataList);
}
/**
* 返回增量设备状态信息
*
* @param lineDataList 线路数据列表
* @return 状态信息
*/
public static Message collectIncrementStatus(
Builder builder, List<DeviceStatusData> lineDataList) {
return commonCollectFunction(
builder,
(data, fieldType) -> {
Map<String, Map<String, Builder>> allDevice = data.getAllDeviceMap();
Map<String, Map<String, Builder>> statusDevice = data.getStatusVOMap();
Map<String, Builder> deviceMap = allDevice.get(fieldType);
if (!CollectionUtils.isEmpty(deviceMap)) { // 如果存在该类型状态则全部放入
// 获取当前存储的状态
Map<String, Builder> statusMap =
statusDevice.computeIfAbsent(fieldType, key -> new HashMap<>(deviceMap.size()));
return compare(statusMap, deviceMap);
}
return List.of();
},
lineDataList);
}
/**
* 对比获取设备变化了的状态信息并更新状态信息
*
* @param curMap 原有的状态信息
* @param newMap 采集到状态信息
* @return 发生变化的状态信息
*/
public static List<Message> compare(Map<String, Builder> curMap, Map<String, Builder> newMap) {
List<Message> messageList = new LinkedList<>();
if (newMap != null) {
newMap.forEach(
(k, v) -> {
if (curMap.containsKey(k)) { // 包含状态对比状态是否一致
Message message = v.build();
if (!curMap.get(k).build().equals(message)) {
curMap.get(k).mergeFrom(message);
messageList.add(message);
}
} else { // 不包含直接添加状态
curMap.put(k, v.clone());
messageList.add(curMap.get(k).build());
}
});
}
return messageList;
}
/**
* 公共处理部分
*
* @param builder 返回的builder对象
* @param compareFun 额外的处理逻辑
* @param lineDataList 线路数据
* @return 消息内容
*/
private static Message commonCollectFunction(
Builder builder,
BiFunction<DeviceStatusData, String, List<Message>> compareFun,
List<DeviceStatusData> lineDataList) {
if (lineDataList == null || lineDataList.size() == 0) {
return builder.build();
}
// 消息体字段列表
builder.getDescriptorForType().getFields().stream()
.filter(f -> f.getType().equals(Type.MESSAGE))
.forEach(
field -> {
String fieldType = field.getMessageType().getName(); // 字段类型
List<Message> allDeviceList = new LinkedList<>();
for (DeviceStatusData data : lineDataList) { // 循环传入的线路数据
allDeviceList.addAll(compareFun.apply(data, fieldType));
}
if (!CollectionUtils.isEmpty(allDeviceList)) {
builder.setField(field, allDeviceList);
}
});
return builder.build();
}
}

View File

@ -1,6 +1,8 @@
package club.joylink.xiannccda.ats.message.collect;
import com.google.protobuf.GeneratedMessageV3.Builder;
import com.google.protobuf.Message;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
@ -15,8 +17,8 @@ public class DeviceStatusData {
/** 所有设备状态信息 <设备类型【这里用的message的类名:Rtu、Signal等】<设备主键,设备信息>> */
private Map<String, Map<String, Builder>> allDeviceMap = new ConcurrentHashMap<>();
/** 当前所有设备状态信息 */
private Map<String, Map<String, Builder>> statusVOMap = new ConcurrentHashMap<>();
/** 增量的设备更新信息 */
private Map<String, List<Message>> statusVOMap = new ConcurrentHashMap<>();
public DeviceStatusData(String lineCode) {
this.lineCode = lineCode;

View File

@ -1,10 +1,13 @@
package club.joylink.xiannccda.ats.message.collect;
import com.google.protobuf.GeneratedMessageV3.Builder;
import com.google.protobuf.Message;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.springframework.util.CollectionUtils;
/** 线路设备操作方法 */
public abstract class DeviceStatusDataOperate {
@ -22,7 +25,7 @@ public abstract class DeviceStatusDataOperate {
* @param builder 设备状态信息
* @return 状态类型
*/
public static String findType(Builder builder) {
private static String findType(Builder builder) {
return builder.getDefaultInstanceForType().getClass().getSimpleName();
}
@ -32,7 +35,7 @@ public abstract class DeviceStatusDataOperate {
* @param protoBuilder 数据信息
* @return 主键
*/
public static String getIdVal(Builder protoBuilder) {
private static String getIdVal(Builder protoBuilder) {
String idName = DEVICE_ID_NAME_MAP.getOrDefault(findType(protoBuilder), DEFAULT_ID_NAME);
return String.valueOf(
protoBuilder.getField(protoBuilder.getDescriptorForType().findFieldByName(idName)));
@ -48,38 +51,29 @@ public abstract class DeviceStatusDataOperate {
if (data == null || builders == null) {
return;
}
Map<String, Map<String, Builder>> map = data.getAllDeviceMap();
// 获取全量数据
Map<String, Map<String, Builder>> allDeviceMap = data.getAllDeviceMap();
// 最后增量数据
Map<String, List<Message>> statusVOMap = new ConcurrentHashMap<>();
builders.stream()
.collect(Collectors.groupingBy(DeviceStatusDataOperate::findType))
.forEach(
(k, v) -> {
// 当前的设备状态
Map<String, Builder> deviceStatusMap =
map.computeIfAbsent(k, key -> new ConcurrentHashMap<>(v.size()));
allDeviceMap.computeIfAbsent(k, key -> new ConcurrentHashMap<>(v.size()));
// 新的设备状态
Map<String, Builder> newDeviceMap =
v.stream().collect(Collectors.toMap(DeviceStatusDataOperate::getIdVal, b -> b));
deviceStatusMap.putAll(newDeviceMap);
// 对比结果
List<Message> compareList = compare(deviceStatusMap, newDeviceMap);
if (CollectionUtils.isEmpty(compareList)) {
statusVOMap.put(k, compareList);
}
});
}
/**
* 将采集到的设备状态放入内存中
*
* @param builder 设备状态信息
* @param data 线路的设备信息
* @return 包装的设备状态
*/
public static void addDevice(Builder builder, DeviceStatusData data) {
if (data == null || builder == null) {
return;
}
Map<String, Map<String, Builder>> map = data.getAllDeviceMap();
String buildType = findType(builder);
Map<String, Builder> deviceStatusMap = map.get(buildType);
if (deviceStatusMap == null) {
deviceStatusMap = new ConcurrentHashMap<>();
map.put(buildType, deviceStatusMap);
}
deviceStatusMap.put(getIdVal(builder), builder);
// 清空上次的增量数据
data.getStatusVOMap().clear();
data.getStatusVOMap().putAll(statusVOMap);
}
/**
@ -94,4 +88,31 @@ public abstract class DeviceStatusDataOperate {
data.getAllDeviceMap().clear();
data.getStatusVOMap().clear();
}
/**
* 对比获取设备变化了的状态信息并更新状态信息
*
* @param curMap 原有的状态信息
* @param newMap 采集到状态信息
* @return 发生变化的状态信息
*/
public static List<Message> compare(Map<String, Builder> curMap, Map<String, Builder> newMap) {
List<Message> messageList = new LinkedList<>();
if (newMap != null) {
newMap.forEach(
(k, v) -> {
if (curMap.containsKey(k)) { // 包含状态对比状态是否一致
Message message = v.build();
if (!curMap.get(k).build().equals(message)) {
curMap.get(k).mergeFrom(message);
messageList.add(message);
}
} else { // 不包含直接添加状态
curMap.put(k, v.clone());
messageList.add(curMap.get(k).build());
}
});
}
return messageList;
}
}

View File

@ -1,9 +1,6 @@
package club.joylink.xiannccda.ats.message.collect;
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineMessage;
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineNetMessage;
import com.google.protobuf.GeneratedMessageV3.Builder;
import com.google.protobuf.Message;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -39,68 +36,6 @@ public abstract class DeviceStatusDataRepository {
DeviceStatusDataOperate.addDevices(builders, data);
}
/**
* 添加单个设备数据
*
* @param lineCode 线路编号
* @param builder 设备消息
*/
public static void addDeviceStatusData(String lineCode, Builder builder) {
DeviceStatusData data = lineStatusDataMap.computeIfAbsent(lineCode, DeviceStatusData::new);
DeviceStatusDataOperate.addDevice(builder, data);
}
/**
* 采集线路状态数据
*
* @param lineCode 线路编码
* @return 消息数据
*/
public static Message collectLineAllMessage(String lineCode) {
return DeviceStatusCollect.collectAllStatus(
WsLineMessage.newBuilder(), List.of(lineStatusDataMap.get(lineCode)));
}
/**
* 采集增量线路数据
*
* @param lineCode 线路编码
* @return 消息数据
*/
public static Message collectLineMessage(String lineCode) {
return DeviceStatusCollect.collectIncrementStatus(
WsLineMessage.newBuilder(), List.of(lineStatusDataMap.get(lineCode)));
}
/**
* 发送线网消息数据
*
* @return 消息数据
*/
public static Message collectLineNetAllMessage() {
return DeviceStatusCollect.collectAllStatus(
WsLineNetMessage.newBuilder(), lineStatusDataMap.values().stream().toList());
}
/**
* 发送线网增量消息数据
*
* @return 消息数据
*/
public static Message collectLineNetMessage() {
return DeviceStatusCollect.collectIncrementStatus(
WsLineNetMessage.newBuilder(), lineStatusDataMap.values().stream().toList());
}
/**
* 获取现在存在的线路编码列表
*
* @return 线路编码列表
*/
public static List<String> getLineCodeList() {
return lineStatusDataMap.keySet().stream().toList();
}
/**
* 获取线路总数据
*

View File

@ -12,13 +12,13 @@ public class LineNetMessageServer implements IMessageServer {
@Override
public String getDestinationPattern() {
return "/queue/line/all";
return "/queue/lineNet";
}
@Override
public Object onSubscription(String destination, Map<String, String> paramMap) {
log.info("【线网订阅】发布全量数据");
byte[] bytes = DeviceStatusDataRepository.collectLineNetAllMessage().toByteArray();
byte[] bytes = new byte[1];
log.info("【线网订阅】全量数据量:" + bytes.length);
return bytes;
}
@ -31,7 +31,7 @@ public class LineNetMessageServer implements IMessageServer {
@Override
public List<TopicMessage> onTick() {
List<TopicMessage> topicMessages = new ArrayList<>();
byte[] bytes = DeviceStatusDataRepository.collectLineNetMessage().toByteArray();
byte[] bytes = new byte[1];
log.info("【线网订阅】数据状态变化量:" + bytes.length);
topicMessages.add(new TopicMessage(this.getDestinationPattern(), bytes));
return topicMessages;

View File

@ -29,7 +29,12 @@ public class TestMessageServer implements IMessageServer {
String lineId = paramMap.get(LineIdPathKey);
BusinessExceptionAssertEnum.ARGUMENT_ILLEGAL.assertNotNull(lineId);
log.info("线路lineId={}订阅,发布全量数据", lineId);
byte[] bytes = DeviceStatusDataRepository.collectLineAllMessage(lineId).toByteArray();
Builder builder = WsLineMessage.newBuilder();
builder.addRtu(Rtu.newBuilder().setId("rtu1").setIpRtuStusInCentralCtrl(true).build());
builder.addSignal(
Signal.newBuilder().setId("signal1").setRedOpen(true).setAutoMode(true).build());
byte[] bytes = builder.build().toByteArray();
System.out.println(JSON.toJSONString(bytes));
return bytes;
}

View File

@ -34,7 +34,12 @@ public class TestMessageServer2 implements IMessageServer {
@Override
public Object onSubscription(String destination, Map<String, String> paramMap) {
log.info("线路lineId={}订阅,发布全量数据", lineId);
byte[] bytes = DeviceStatusDataRepository.collectLineAllMessage(lineId).toByteArray();
Builder builder = WsLineMessage.newBuilder();
builder.addRtu(Rtu.newBuilder().setId("rtu1").setIpRtuStusInCentralCtrl(true).build());
builder.addSignal(
Signal.newBuilder().setId("signal1").setRedOpen(true).setAutoMode(true).build());
byte[] bytes = builder.build().toByteArray();
System.out.println(JSON.toJSONString(bytes));
return bytes;
}