【修改线路时,数据加锁】

This commit is contained in:
weizhihong 2023-06-21 15:52:36 +08:00
parent 28d55ef8db
commit 06b5f499c4
3 changed files with 63 additions and 34 deletions

View File

@ -4,6 +4,8 @@ import com.google.protobuf.GeneratedMessageV3.Builder;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Getter; import lombok.Getter;
/** 线路设备状态信息 */ /** 线路设备状态信息 */
@ -19,6 +21,9 @@ public class DeviceStatusData {
/** 增量的设备更新信息 */ /** 增量的设备更新信息 */
private Map<String, Map<String, Message>> statusVOMap = new ConcurrentHashMap<>(); private Map<String, Map<String, Message>> statusVOMap = new ConcurrentHashMap<>();
/** 同步锁 */
private Lock lock = new ReentrantLock();
public DeviceStatusData(String lineCode) { public DeviceStatusData(String lineCode) {
this.lineCode = lineCode; this.lineCode = lineCode;
} }

View File

@ -20,6 +20,62 @@ public abstract class DeviceStatusDataOperate {
private static final Map<String, String> DEVICE_ID_NAME_MAP = private static final Map<String, String> DEVICE_ID_NAME_MAP =
Map.of("LineNetTrainOffsetDiagram", "groupId", "Train", "groupId"); Map.of("LineNetTrainOffsetDiagram", "groupId", "Train", "groupId");
/**
* 批量放入设备状态
*
* @param builders 要添加的设备状态列表
* @param data 存储内存
*/
public static void addDevices(List<Builder> builders, DeviceStatusData data) {
if (data == null || builders == null) {
return;
}
data.getLock().lock(); // 对该线路数据上锁
try {
// 获取全量数据
Map<String, Map<String, Builder>> allDeviceMap = data.getAllDeviceMap();
// 最后增量数据
Map<String, Map<String, Message>> statusVOMap = new ConcurrentHashMap<>();
builders.stream()
.collect(Collectors.groupingBy(DeviceStatusDataOperate::findType))
.forEach(
(k, v) -> {
// 当前的设备状态
Map<String, Builder> deviceStatusMap =
allDeviceMap.computeIfAbsent(k, key -> new ConcurrentHashMap<>(v.size()));
// 新的设备状态
Map<String, Builder> newDeviceMap =
v.stream().collect(Collectors.toMap(DeviceStatusDataOperate::getIdVal, b -> b));
// 对比结果
Map<String, Message> compareMap = compare(deviceStatusMap, newDeviceMap);
if (!CollectionUtils.isEmpty(compareMap)) {
statusVOMap.put(k, compareMap);
}
});
if (!CollectionUtils.isEmpty(statusVOMap)) {
data.getStatusVOMap().clear();
data.getStatusVOMap().putAll(statusVOMap);
}
} finally {
data.getLock().unlock();
}
}
/**
* 在能得到锁的情况清除增量状态
*
* @param data 设备状态数据
*/
public static void clearStatusVOMap(DeviceStatusData data) {
if (data.getLock().tryLock()) {
try {
data.getStatusVOMap().clear();
} finally {
data.getLock().unlock();
}
}
}
/** /**
* 获取设备状态类型 * 获取设备状态类型
* *
@ -42,39 +98,6 @@ public abstract class DeviceStatusDataOperate {
protoBuilder.getField(protoBuilder.getDescriptorForType().findFieldByName(idName))); protoBuilder.getField(protoBuilder.getDescriptorForType().findFieldByName(idName)));
} }
/**
* 批量放入设备状态
*
* @param builders 要添加的设备状态列表
* @param data 存储内存
*/
public static void addDevices(List<Builder> builders, DeviceStatusData data) {
if (data == null || builders == null) {
return;
}
// 获取全量数据
Map<String, Map<String, Builder>> allDeviceMap = data.getAllDeviceMap();
// 最后增量数据
Map<String, Map<String, Message>> statusVOMap = new ConcurrentHashMap<>();
builders.stream()
.collect(Collectors.groupingBy(DeviceStatusDataOperate::findType))
.forEach(
(k, v) -> {
// 当前的设备状态
Map<String, Builder> deviceStatusMap =
allDeviceMap.computeIfAbsent(k, key -> new ConcurrentHashMap<>(v.size()));
// 新的设备状态
Map<String, Builder> newDeviceMap =
v.stream().collect(Collectors.toMap(DeviceStatusDataOperate::getIdVal, b -> b));
// 对比结果
Map<String, Message> compareMap = compare(deviceStatusMap, newDeviceMap);
if (!CollectionUtils.isEmpty(compareMap)) {
statusVOMap.put(k, compareMap);
}
});
data.getStatusVOMap().putAll(statusVOMap);
}
/** /**
* 对比获取设备变化了的状态信息并更新状态信息 * 对比获取设备变化了的状态信息并更新状态信息
* *

View File

@ -1,6 +1,7 @@
package club.joylink.xiannccda.ws; package club.joylink.xiannccda.ws;
import club.joylink.xiannccda.ats.message.collect.DeviceStatusData; import club.joylink.xiannccda.ats.message.collect.DeviceStatusData;
import club.joylink.xiannccda.ats.message.collect.DeviceStatusDataOperate;
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineNetMessage; import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineNetMessage;
import com.google.protobuf.Descriptors.FieldDescriptor.Type; import com.google.protobuf.Descriptors.FieldDescriptor.Type;
import com.google.protobuf.Message; import com.google.protobuf.Message;
@ -61,7 +62,7 @@ public class LineNetMessageServer implements IMessageServer {
(fieldType) -> dataSource.getStatusVOMap().getOrDefault(fieldType, Map.of())); (fieldType) -> dataSource.getStatusVOMap().getOrDefault(fieldType, Map.of()));
byte[] bytes = message.toByteArray(); byte[] bytes = message.toByteArray();
// 清空增量 // 清空增量
dataSource.getStatusVOMap().clear(); DeviceStatusDataOperate.clearStatusVOMap(dataSource);
log.info("【线网订阅】数据状态变化量:" + bytes.length); log.info("【线网订阅】数据状态变化量:" + bytes.length);
topicMessages.add(new TopicMessage(this.getDestinationPattern(), bytes)); topicMessages.add(new TopicMessage(this.getDestinationPattern(), bytes));
} }