【线路数据websocket消息逻辑】

This commit is contained in:
weizhihong 2023-06-16 19:41:49 +08:00
parent 5f6a36e933
commit c147993941
11 changed files with 5526 additions and 1383 deletions

View File

@ -1,34 +0,0 @@
package club.joylink.xiannccda.ats.message;
import club.joylink.xiannccda.ats.message.collect.StatusDataRepository;
import com.google.protobuf.Message;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
/** 设备状态发送定时任务 */
@Component
public class DeviceStatusMessageManage implements ApplicationRunner {
static final int Period = 1;
/** 定时任务线程池 */
static final ScheduledExecutorService Executor = Executors.newSingleThreadScheduledExecutor();
@Override
public void run(ApplicationArguments args) throws Exception {
Executor.scheduleAtFixedRate(
() -> {
List<Message> changesMessage = StatusDataRepository.collectAllCompare();
// 如果变更消息不为空则发送信息
if (!CollectionUtils.isEmpty(changesMessage)) {}
},
Period,
Period,
TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,119 @@
package club.joylink.xiannccda.ats.message.collect;
import com.google.protobuf.Descriptors.FieldDescriptor.Type;
import com.google.protobuf.GeneratedMessageV3.Builder;
import com.google.protobuf.Message;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import org.springframework.util.CollectionUtils;
/** 收集状态方法 */
public class DeviceStatusCollect {
/**
* 返回全量设备状态信息
*
* @param builder 返回的消息类型
* @param lineDataList 要处理的线路数据列表
* @return 状态消息
*/
public static Message collectAllStatus(Builder builder, List<DeviceStatusData> lineDataList) {
return commonCollectFunction(
builder,
(data, fieldType) -> {
Map<String, Map<String, Builder>> allDevice = data.getAllDeviceMap();
Map<String, Builder> deviceMap = allDevice.get(fieldType);
if (!CollectionUtils.isEmpty(deviceMap)) { // 如果存在该类型状态则全部放入
return deviceMap.values().stream().map(Builder::build).toList();
}
return List.of();
},
lineDataList);
}
/**
* 返回增量设备状态信息
*
* @param lineDataList 线路数据列表
* @return 状态信息
*/
public static Message collectIncrementStatus(
Builder builder, List<DeviceStatusData> lineDataList) {
return commonCollectFunction(
builder,
(data, fieldType) -> {
Map<String, Map<String, Builder>> allDevice = data.getAllDeviceMap();
Map<String, Map<String, Builder>> statusDevice = data.getStatusVOMap();
Map<String, Builder> deviceMap = allDevice.get(fieldType);
if (!CollectionUtils.isEmpty(deviceMap)) { // 如果存在该类型状态则全部放入
// 获取当前存储的状态
Map<String, Builder> statusMap =
statusDevice.computeIfAbsent(fieldType, key -> new HashMap<>(deviceMap.size()));
return compare(statusMap, deviceMap);
}
return List.of();
},
lineDataList);
}
/**
* 对比获取设备变化了的状态信息并更新状态信息
*
* @param curMap 原有的状态信息
* @param newMap 采集到状态信息
* @return 发生变化的状态信息
*/
public static List<Message> compare(Map<String, Builder> curMap, Map<String, Builder> newMap) {
List<Message> messageList = new LinkedList<>();
if (newMap != null) {
newMap.forEach(
(k, v) -> {
if (curMap.containsKey(k)) { // 包含状态对比状态是否一致
Message message = v.build();
if (!curMap.get(k).build().equals(message)) {
curMap.get(k).mergeFrom(message);
messageList.add(message);
}
} else { // 不包含直接添加状态
curMap.put(k, v.clone());
messageList.add(curMap.get(k).build());
}
});
}
return messageList;
}
/**
* 公共处理部分
*
* @param builder 返回的builder对象
* @param compareFun 额外的处理逻辑
* @param lineDataList 线路数据
* @return 消息内容
*/
private static Message commonCollectFunction(
Builder builder,
BiFunction<DeviceStatusData, String, List<Message>> compareFun,
List<DeviceStatusData> lineDataList) {
if (lineDataList == null || lineDataList.size() == 0) {
return builder.build();
}
// 消息体字段列表
builder.getDescriptorForType().getFields().stream()
.filter(f -> f.getType().equals(Type.MESSAGE))
.forEach(
field -> {
String fieldType = field.getMessageType().getName(); // 字段类型
List<Message> allDeviceList = new LinkedList<>();
for (DeviceStatusData data : lineDataList) { // 循环传入的线路数据
allDeviceList.addAll(compareFun.apply(data, fieldType));
}
if (!CollectionUtils.isEmpty(allDeviceList)) {
builder.setField(field, allDeviceList);
}
});
return builder.build();
}
}

View File

@ -1,99 +1,24 @@
package club.joylink.xiannccda.ats.message.collect;
import club.joylink.xiannccda.dto.protos.WsMessageProto;
import com.google.protobuf.GeneratedMessageV3.Builder;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.Getter;
/** 设备状态管理信息 */
public abstract class DeviceStatusData {
/** 线路设备状态信息 */
@Getter
public class DeviceStatusData {
/** 默认的主键名称 */
private static final String DEFAULT_ID_NAME = "id";
/** 线路号 */
private String lineCode;
/** 设备主键名称映射 */
private static final Map<String, String> DEVICE_ID_NAME_MAP =
Map.of("LineNetTrainOffsetDiagram", "groupId");
/** 设备状态信息 */
private static final Map<String, Map<String, Builder>> deviceStatusBuilder =
new ConcurrentHashMap<>();
/** 所有设备状态信息 */
private Map<String, Map<String, Builder>> allDeviceMap = new ConcurrentHashMap<>();
/** 当前所有设备状态信息 */
private static final Map<String, Builder> deviceStatusVOMap = new ConcurrentHashMap<>();
private Map<String, Map<String, Builder>> statusVOMap = new ConcurrentHashMap<>();
/**
* 返回全量设备状态信息
*
* @return 状态消息
*/
public abstract WsMessageProto.WsMessage collectAllStatus();
/**
* 返回增量设备状态信息
*
* @return 状态信息
*/
public abstract WsMessageProto.WsMessage collectIncrementStatus();
/**
* 获取设备状态类型
*
* @param builder 设备状态信息
* @return 状态类型
*/
public static String findType(Builder builder) {
return builder.getDefaultInstanceForType().getClass().getSimpleName();
}
/**
* 获取主键值
*
* @param protoBuilder 数据信息
* @return 主键
*/
protected String getIdVal(Builder protoBuilder) {
String idName = DEVICE_ID_NAME_MAP.getOrDefault(findType(protoBuilder), DEFAULT_ID_NAME);
return String.valueOf(
protoBuilder.getField(protoBuilder.getDescriptorForType().findFieldByName(idName)));
}
/**
* 批量放入设备状态
*
* @param builders 设备状态列表
*/
protected void addAllDeviceStatus(List<Builder> builders) {
builders.stream()
.collect(Collectors.groupingBy(DeviceStatusData::findType))
.forEach(
(k, v) -> {
Map<String, Builder> deviceStatusMap = deviceStatusBuilder.get(k);
if (deviceStatusMap == null) {
deviceStatusMap = new ConcurrentHashMap<>(v.size());
deviceStatusBuilder.put(k, deviceStatusMap);
}
Map<String, Builder> newDeviceMap =
v.stream().collect(Collectors.toMap(this::getIdVal, b -> b));
deviceStatusMap.putAll(newDeviceMap);
});
}
/**
* 将采集到的设备状态放入内存中
*
* @param builder 设备状态信息
* @return 包装的设备状态
*/
protected void addDeviceStatus(Builder builder) {
String buildType = findType(builder);
Map<String, Builder> deviceStatusMap = deviceStatusBuilder.get(buildType);
if (deviceStatusMap == null) {
deviceStatusMap = new ConcurrentHashMap<>();
deviceStatusBuilder.put(buildType, deviceStatusMap);
}
deviceStatusMap.put(getIdVal(builder), builder);
public DeviceStatusData(String lineCode) {
this.lineCode = lineCode;
}
}

View File

@ -0,0 +1,102 @@
package club.joylink.xiannccda.ats.message.collect;
import com.google.protobuf.GeneratedMessageV3.Builder;
import com.google.protobuf.Message;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/** 线路设备操作方法 */
public class DeviceStatusDataOperate {
/** 默认的主键名称 */
private static final String DEFAULT_ID_NAME = "id";
/** 设备主键名称映射 */
private static final Map<String, String> DEVICE_ID_NAME_MAP =
Map.of("LineNetTrainOffsetDiagram", "groupId");
/**
* 获取设备状态类型
*
* @param builder 设备状态信息
* @return 状态类型
*/
public static String findType(Builder builder) {
return builder.getDefaultInstanceForType().getClass().getSimpleName();
}
/**
* 获取主键值
*
* @param protoBuilder 数据信息
* @return 主键
*/
public static String getIdVal(Builder protoBuilder) {
String idName = DEVICE_ID_NAME_MAP.getOrDefault(findType(protoBuilder), DEFAULT_ID_NAME);
return String.valueOf(
protoBuilder.getField(protoBuilder.getDescriptorForType().findFieldByName(idName)));
}
/**
* 批量放入设备状态
*
* @param builders 要添加的设备状态列表
* @param data 存储内存
*/
public static void addDevices(List<Builder> builders, DeviceStatusData data) {
if (data == null || builders == null) {
return;
}
Map<String, Map<String, Builder>> map = data.getAllDeviceMap();
builders.stream()
.collect(Collectors.groupingBy(DeviceStatusDataOperate::findType))
.forEach(
(k, v) -> {
Map<String, Builder> deviceStatusMap = map.get(k);
if (deviceStatusMap == null) {
deviceStatusMap = new ConcurrentHashMap<>(v.size());
map.put(k, deviceStatusMap);
}
Map<String, Builder> newDeviceMap =
v.stream().collect(Collectors.toMap(DeviceStatusDataOperate::getIdVal, b -> b));
deviceStatusMap.putAll(newDeviceMap);
});
}
/**
* 将采集到的设备状态放入内存中
*
* @param builder 设备状态信息
* @param data 线路的设备信息
* @return 包装的设备状态
*/
public static void addDevice(Builder builder, DeviceStatusData data) {
if (data == null || builder == null) {
return;
}
Map<String, Map<String, Builder>> map = data.getAllDeviceMap();
String buildType = findType(builder);
Map<String, Builder> deviceStatusMap = map.get(buildType);
if (deviceStatusMap == null) {
deviceStatusMap = new ConcurrentHashMap<>();
map.put(buildType, deviceStatusMap);
}
deviceStatusMap.put(getIdVal(builder), builder);
}
/**
* 清除线路里的所有设备数据
*
* @param data 线路数据
*/
public static void resetDevice(DeviceStatusData data) {
if (data == null) {
return;
}
data.getAllDeviceMap().clear();
data.getStatusVOMap().clear();
}
}

View File

@ -1,19 +0,0 @@
package club.joylink.xiannccda.ats.message.collect;
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsMessage;
/** 线路设备状态数据 */
public class LineDeviceStatusData extends DeviceStatusData {
@Override
public WsMessage collectAllStatus() {
return null;
}
@Override
public WsMessage collectIncrementStatus() {
return null;
}
}

View File

@ -1,17 +0,0 @@
package club.joylink.xiannccda.ats.message.collect;
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsMessage;
/** 线网设备状态数据 */
public class LineNetDeviceStatusData extends DeviceStatusData {
@Override
public WsMessage collectAllStatus() {
return null;
}
@Override
public WsMessage collectIncrementStatus() {
return null;
}
}

View File

@ -1,90 +1,92 @@
package club.joylink.xiannccda.ats.message.collect;
import com.google.protobuf.GeneratedMessageV3;
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineMessage;
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineNetMessage;
import com.google.protobuf.GeneratedMessageV3.Builder;
import com.google.protobuf.Message;
import lombok.Getter;
import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
import org.springframework.stereotype.Component;
/** 状态信息初始化 */
@Getter
@Component
public class StatusDataRepository {
/** 主键字段名称 */
private static final String ID_NAME = "id";
/** 线网数据 */
/** 线路信息 */
/** 设备状态唯一标识 */
// private static
/** 采集回来的设备状态信息 */
private static Map<String, GeneratedMessageV3.Builder> deviceStatusBuilder =
new ConcurrentHashMap<>();
/** 当前所有设备状态信息 */
private static final Map<String, GeneratedMessageV3.Builder> statusDataVOMap =
new ConcurrentHashMap<>();
/** 线路设备状态信息集合 */
private Map<String, DeviceStatusData> lineStatusDataMap = new ConcurrentHashMap<>();
/**
* 初始化时放入设备状态
* 添加所属线路的设备信息
*
* @param builders 设备状态
* @param lineCode 线路编号
* @param builders 设备信息
*/
public void addAllDeviceStatus(List<GeneratedMessageV3.Builder> builders) {
deviceStatusBuilder = new ConcurrentHashMap<>(builders.size());
builders.forEach(StatusDataRepository::addDeviceStatus);
public void addAllDeviceStatusData(String lineCode, List<Builder> builders) {
DeviceStatusData data = lineStatusDataMap.get(lineCode);
if (data == null) {
data = new DeviceStatusData(lineCode);
lineStatusDataMap.put(lineCode, data);
}
DeviceStatusDataOperate.addDevices(builders, data);
}
/**
* 将采集到的设备状态放入内存中
* 添加单个设备数据
*
* @param builder 设备状态信息
* @return 包装的设备状态
* @param lineCode 线路编号
* @param builder 设备消息
*/
public static void addDeviceStatus(GeneratedMessageV3.Builder builder) {
deviceStatusBuilder.put(getIdVal(builder), builder);
public void addDeviceStatusData(String lineCode, Builder builder) {
DeviceStatusData data = lineStatusDataMap.get(lineCode);
if (data == null) {
data = new DeviceStatusData(lineCode);
lineStatusDataMap.put(lineCode, data);
}
DeviceStatusDataOperate.addDevice(builder, data);
}
/**
* 对比获取设备变化了的状态信息
* 采集线路状态数据
*
* @return 发生变化的状态信息
* @param lineCode 线路编码
* @return 消息数据
*/
public static List<Message> collectAllCompare() {
List<Message> resultMessage = new LinkedList<>();
deviceStatusBuilder.forEach(
(k, v) -> {
Message message = null;
if (statusDataVOMap.containsKey(k)) {
message = v.build();
if (!statusDataVOMap.get(k).build().equals(message)) { // 判断
statusDataVOMap.get(k).mergeFrom(message);
} else {
message = null;
}
} else {
message = v.build();
statusDataVOMap.put(k, v.clone());
}
if (message != null) {
resultMessage.add(message);
}
});
return resultMessage;
public Message collectLineAllMessage(String lineCode) {
return DeviceStatusCollect.collectAllStatus(
WsLineMessage.newBuilder(), List.of(lineStatusDataMap.get(lineCode)));
}
/**
* 获取主键值
* 采集增量线路数据
*
* @param statusProto 数据信息
* @return 主键
* @param lineCode 线路编码
* @return 消息数据
*/
private static String getIdVal(GeneratedMessageV3.Builder statusProto) {
return String.valueOf(
statusProto.getField(statusProto.getDescriptorForType().findFieldByName(ID_NAME)));
public Message collectLineMessage(String lineCode) {
return DeviceStatusCollect.collectIncrementStatus(
WsLineMessage.newBuilder(), List.of(lineStatusDataMap.get(lineCode)));
}
/**
* 发送线网消息数据
*
* @return 消息数据
*/
public Message collectLineNetAllMessage() {
return DeviceStatusCollect.collectAllStatus(
WsLineNetMessage.newBuilder(), lineStatusDataMap.values().stream().toList());
}
/**
* 发送线网增量消息数据
*
* @return 消息数据
*/
public Message collectLineNetMessage() {
return DeviceStatusCollect.collectIncrementStatus(
WsLineNetMessage.newBuilder(), lineStatusDataMap.values().stream().toList());
}
}

@ -1 +1 @@
Subproject commit f0cdf851ebfe03ceb073e1c55706f3ccb49e73c1
Subproject commit 2697dec89345cec7bdfdb7f63b0b4872db30cbd3