【设备状态转换】

This commit is contained in:
weizhihong 2023-06-21 14:22:21 +08:00
parent 95db3729b5
commit bcb50b8381
8 changed files with 108 additions and 45 deletions

View File

@ -0,0 +1,18 @@
package club.joylink.xiannccda.ats.message.collect;
import club.joylink.xiannccda.ats.message.MessageData;
import java.util.List;
/** 消息数据额外转换时间,比如:线路的数据转换为线网数据 */
public abstract class DeviceStatusConvertor {
/**
* 获取转换ID
*
* @return 转换ID
*/
abstract String getId();
/** 任务执行逻辑 */
abstract void run(List<MessageData> messageData);
}

View File

@ -0,0 +1,53 @@
package club.joylink.xiannccda.ats.message.collect;
import club.joylink.xiannccda.ats.message.MessageData;
import club.joylink.xiannccda.ats.message.MessageId;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/** 消息与转换时间管理 */
public class DeviceStatusConvertorManager {
/** 消息类型,与转换列表 */
private static final Map<MessageId, Map<String, DeviceStatusConvertor>> CONVERTOR_MAP =
new ConcurrentHashMap<>();
/**
* 添加转换逻辑程序
*
* @param msgId 消息ID
* @param convertor 转换对象
*/
public static void addStatusConvertor(MessageId msgId, DeviceStatusConvertor convertor) {
Map<String, DeviceStatusConvertor> convertorMap =
CONVERTOR_MAP.computeIfAbsent(msgId, k -> new ConcurrentHashMap<>());
convertorMap.put(convertor.getId(), convertor);
}
/**
* 移除转换逻辑
*
* @param msgId 消息ID
* @param id 装换ID
*/
public static void removeStatusConvertor(MessageId msgId, String id) {
if (CONVERTOR_MAP.containsKey(msgId)) {
Map<String, DeviceStatusConvertor> convertorMap = CONVERTOR_MAP.get(msgId);
convertorMap.remove(id);
}
}
/**
* 转换数据消息
*
* @param dataList 消息数据列表
*/
public static void doConvertor(List<MessageData> dataList) {
dataList.stream()
.filter(m -> CONVERTOR_MAP.containsKey(m.getMsgId()))
.collect(Collectors.groupingBy(MessageData::getMsgId))
.forEach((k, v) -> CONVERTOR_MAP.get(k).values().forEach(c -> c.run(v)));
}
}

View File

@ -2,7 +2,6 @@ package club.joylink.xiannccda.ats.message.collect;
import com.google.protobuf.GeneratedMessageV3.Builder;
import com.google.protobuf.Message;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
@ -18,7 +17,7 @@ public class DeviceStatusData {
private Map<String, Map<String, Builder>> allDeviceMap = new ConcurrentHashMap<>();
/** 增量的设备更新信息 */
private Map<String, List<Message>> statusVOMap = new ConcurrentHashMap<>();
private Map<String, Map<String, Message>> statusVOMap = new ConcurrentHashMap<>();
public DeviceStatusData(String lineCode) {
this.lineCode = lineCode;

View File

@ -2,6 +2,7 @@ package club.joylink.xiannccda.ats.message.collect;
import com.google.protobuf.GeneratedMessageV3.Builder;
import com.google.protobuf.Message;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -54,7 +55,7 @@ public abstract class DeviceStatusDataOperate {
// 获取全量数据
Map<String, Map<String, Builder>> allDeviceMap = data.getAllDeviceMap();
// 最后增量数据
Map<String, List<Message>> statusVOMap = new ConcurrentHashMap<>();
Map<String, Map<String, Message>> statusVOMap = new ConcurrentHashMap<>();
builders.stream()
.collect(Collectors.groupingBy(DeviceStatusDataOperate::findType))
.forEach(
@ -66,27 +67,14 @@ public abstract class DeviceStatusDataOperate {
Map<String, Builder> newDeviceMap =
v.stream().collect(Collectors.toMap(DeviceStatusDataOperate::getIdVal, b -> b));
// 对比结果
List<Message> compareList = compare(deviceStatusMap, newDeviceMap);
if (!CollectionUtils.isEmpty(compareList)) {
statusVOMap.put(k, compareList);
Map<String, Message> compareMap = compare(deviceStatusMap, newDeviceMap);
if (!CollectionUtils.isEmpty(compareMap)) {
statusVOMap.put(k, compareMap);
}
});
data.getStatusVOMap().putAll(statusVOMap);
}
/**
* 清除线路里的所有设备数据
*
* @param data 线路数据
*/
public static void resetDevice(DeviceStatusData data) {
if (data == null) {
return;
}
data.getAllDeviceMap().clear();
data.getStatusVOMap().clear();
}
/**
* 对比获取设备变化了的状态信息并更新状态信息
*
@ -94,23 +82,25 @@ public abstract class DeviceStatusDataOperate {
* @param newMap 采集到状态信息
* @return 发生变化的状态信息
*/
public static List<Message> compare(Map<String, Builder> curMap, Map<String, Builder> newMap) {
List<Message> messageList = new LinkedList<>();
private static Map<String, Message> compare(
Map<String, Builder> curMap, Map<String, Builder> newMap) {
if (newMap != null) {
Map<String, Message> messageMap = new HashMap<>(newMap.size());
newMap.forEach(
(k, v) -> {
if (curMap.containsKey(k)) { // 包含状态对比状态是否一致
Message message = v.build();
if (!curMap.get(k).build().equals(message)) {
curMap.get(k).mergeFrom(message);
messageList.add(message);
messageMap.put(k, message);
}
} else { // 不包含直接添加状态
curMap.put(k, v.clone());
messageList.add(curMap.get(k).build());
messageMap.put(k, curMap.get(k).build());
}
});
return messageMap;
}
return messageList;
return Map.of();
}
}

View File

@ -11,15 +11,14 @@ import org.springframework.util.CollectionUtils;
/**
* 事件总线
* <p>
* 负责事件的监听器的管理和事件的发布
*
* <p>负责事件的监听器的管理和事件的发布
*/
public class EventEmitter {
private Map<Class<?>, List<Listener<?>>> listenerMap = new HashMap<>();
@Getter
private String id;
@Getter private String id;
public EventEmitter(String id) {
this.id = id;

View File

@ -159,7 +159,6 @@ public class LineDeviceStatusService {
offsetMap.put(k, 0); // 位置归零
v.setDirection(v.getDirection() == 0 ? 1 : 0); // 方向转换
} else {
log.info(k + "路径:" + v.getDevName() + "->" + path.get(offsetMap.get(k)));
v.setDevName(path.get(offsetMap.get(k))); // 更新到下一个位置
v.setDestinationId(path.size() - 1);
offsetMap.put(k, offsetMap.get(k) + 1);

View File

@ -6,6 +6,7 @@ import com.google.protobuf.Descriptors.FieldDescriptor.Type;
import com.google.protobuf.Message;
import com.google.protobuf.GeneratedMessageV3.Builder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@ -35,9 +36,11 @@ public class LineNetMessageServer implements IMessageServer {
(fieldType) -> {
Map<String, Builder> deviceMap = dataSource.getAllDeviceMap().get(fieldType);
if (!CollectionUtils.isEmpty(deviceMap)) { // 如果存在该类型状态则全部放入
return deviceMap.values().stream().map(Builder::build).toList();
Map<String, Message> messageMap = new HashMap<>(deviceMap.size());
deviceMap.forEach((k, v) -> messageMap.put(k, v.build()));
return messageMap;
}
return List.of();
return Map.of();
});
byte[] bytes = message.toByteArray();
log.info("【线网订阅】全量数据量:" + bytes.length);
@ -46,20 +49,22 @@ public class LineNetMessageServer implements IMessageServer {
@Override
public int getInterval() {
return 2000;
return 1000;
}
@Override
public List<TopicMessage> onTick() {
List<TopicMessage> topicMessages = new ArrayList<>();
WsLineNetMessage message =
commonCollectFunction(
(fieldType) -> dataSource.getStatusVOMap().getOrDefault(fieldType, List.of()));
byte[] bytes = message.toByteArray();
// 清空增量
dataSource.getStatusVOMap().clear();
log.info("【线网订阅】数据状态变化量:" + bytes.length);
topicMessages.add(new TopicMessage(this.getDestinationPattern(), bytes));
if (!dataSource.getStatusVOMap().isEmpty()) {
WsLineNetMessage message =
commonCollectFunction(
(fieldType) -> dataSource.getStatusVOMap().getOrDefault(fieldType, Map.of()));
byte[] bytes = message.toByteArray();
// 清空增量
dataSource.getStatusVOMap().clear();
log.info("【线网订阅】数据状态变化量:" + bytes.length);
topicMessages.add(new TopicMessage(this.getDestinationPattern(), bytes));
}
return topicMessages;
}
@ -69,7 +74,7 @@ public class LineNetMessageServer implements IMessageServer {
* @param compareFun 额外的处理逻辑
* @return 消息内容
*/
public WsLineNetMessage commonCollectFunction(Function<String, List<Message>> compareFun) {
public WsLineNetMessage commonCollectFunction(Function<String, Map<String, Message>> compareFun) {
WsLineNetMessage.Builder builder = WsLineNetMessage.newBuilder();
// 消息体字段列表
builder.getDescriptorForType().getFields().stream()
@ -77,9 +82,9 @@ public class LineNetMessageServer implements IMessageServer {
.forEach(
field -> {
String fieldType = field.getMessageType().getName(); // 字段类型
List<Message> allDeviceList = compareFun.apply(fieldType);
if (!CollectionUtils.isEmpty(allDeviceList)) {
builder.setField(field, allDeviceList);
Map<String, Message> allDeviceMap = compareFun.apply(fieldType);
if (!CollectionUtils.isEmpty(allDeviceMap)) {
builder.setField(field, new ArrayList<>(allDeviceMap.values()));
}
});
return builder.build();

@ -1 +1 @@
Subproject commit a305eb80720363713dfbb6d732d21f1fbd7e61b3
Subproject commit 3bd4f023cd1810dd1ec9611c3714d626a08377d2