模拟数据加载及线路设备,列车信息
This commit is contained in:
parent
6e7a720425
commit
909bd0bcac
@ -6,13 +6,33 @@ import java.util.Map;
|
|||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
|
||||||
/** 状态信息初始化 */
|
/**
|
||||||
|
* 状态信息初始化
|
||||||
|
*/
|
||||||
@Getter
|
@Getter
|
||||||
public abstract class DeviceStatusDataRepository {
|
public abstract class DeviceStatusDataRepository {
|
||||||
|
|
||||||
/** 线路设备状态信息集合 */
|
/**
|
||||||
|
* 线路设备状态信息集合
|
||||||
|
*/
|
||||||
private static final Map<String, DeviceStatusData> lineStatusDataMap = new ConcurrentHashMap<>();
|
private static final Map<String, DeviceStatusData> lineStatusDataMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private static final Map<String, TrainInfoData> trainDataMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
|
||||||
|
public static void addTrain(String lineCode, List<Builder> builders) {
|
||||||
|
TrainInfoData data = trainDataMap.computeIfAbsent(lineCode, TrainInfoData::new);
|
||||||
|
for (Builder builder : builders) {
|
||||||
|
data.addTrain(builder);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static TrainInfoData getAllTrainInfo(String lineId) {
|
||||||
|
return trainDataMap.computeIfAbsent(lineId, TrainInfoData::new);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 批量增加
|
* 批量增加
|
||||||
*
|
*
|
||||||
|
@ -0,0 +1,88 @@
|
|||||||
|
package club.joylink.xiannccda.ats.message.collect;
|
||||||
|
|
||||||
|
import club.joylink.xiannccda.dto.protos.TrainProto.TrainInfo;
|
||||||
|
import com.google.common.collect.HashBasedTable;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.common.collect.Table;
|
||||||
|
import com.google.common.collect.Tables;
|
||||||
|
import com.google.protobuf.Descriptors.FieldDescriptor;
|
||||||
|
import com.google.protobuf.GeneratedMessageV3.Builder;
|
||||||
|
import com.google.protobuf.Message;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 线路设备状态信息
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class TrainInfoData {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 线路号
|
||||||
|
*/
|
||||||
|
private final String lineCode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 所有设备状态信息 <列车类型 【这里用的message的类名:TrainInfo等】,<车组号,列车信息>>
|
||||||
|
*/
|
||||||
|
private Map<String, TrainInfo.Builder> allTrainInfoMaper = Maps.newConcurrentMap();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 增量的设备更新信息
|
||||||
|
*/
|
||||||
|
private Table<String, String, Message> trainUpdate = Tables.synchronizedTable(HashBasedTable.create());
|
||||||
|
private final static String TRAIN_INFO_NAME = "TrainInfo";
|
||||||
|
private final static String TRAIN_INFO_ID = "groupId";
|
||||||
|
|
||||||
|
public TrainInfoData(String lineCode) {
|
||||||
|
this.lineCode = lineCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void addTrain(Builder builder) {
|
||||||
|
Object idVal = builder.getField(builder.getDescriptorForType().findFieldByName(TRAIN_INFO_ID));
|
||||||
|
String typeName = builder.getDescriptorForType().getName();
|
||||||
|
Message message = builder.build();
|
||||||
|
if (StringUtils.equals(typeName, TRAIN_INFO_NAME)) {
|
||||||
|
Builder trainBuild = allTrainInfoMaper.get(idVal.toString());
|
||||||
|
if (Objects.isNull(trainBuild)) {
|
||||||
|
trainBuild = builder;
|
||||||
|
} else {
|
||||||
|
trainBuild.mergeFrom(message);
|
||||||
|
}
|
||||||
|
if (trainBuild instanceof TrainInfo.Builder trainInfo) {
|
||||||
|
trainInfo.clearBlock();
|
||||||
|
trainInfo.clearRemove();
|
||||||
|
trainInfo.clearRecord();
|
||||||
|
allTrainInfoMaper.put(idVal.toString(), trainInfo);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
trainUpdate.put(typeName, idVal.toString(), message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void fillBlockOrRecordOrRemove(TrainInfo.Builder trainInfo, String key, String filedName) {
|
||||||
|
Message message = this.trainUpdate.get(key, trainInfo.getGroupId());
|
||||||
|
FieldDescriptor field = trainInfo.getDescriptorForType().findFieldByName(filedName);
|
||||||
|
if (Objects.nonNull(message)) {
|
||||||
|
// trainInfo.clearField(field);
|
||||||
|
trainInfo.setField(field, message);
|
||||||
|
this.trainUpdate.remove(key, trainInfo.getGroupId());
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void reput(TrainInfo.Builder trainInfo) {
|
||||||
|
if (trainInfo.hasRemove()) {
|
||||||
|
this.allTrainInfoMaper.remove(trainInfo.getGroupId());
|
||||||
|
} else {
|
||||||
|
this.allTrainInfoMaper.put(trainInfo.getGroupId(), trainInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Collection<TrainInfo.Builder> getAllTrain() {
|
||||||
|
return this.allTrainInfoMaper.values();
|
||||||
|
}
|
||||||
|
}
|
@ -114,12 +114,8 @@ public class DeviceStatusConvertor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static void fillField(GeneratedMessageV3.Builder build, String attrName, Object vals) {
|
public static void fillField(GeneratedMessageV3.Builder build, String attrName, Object vals) {
|
||||||
Optional<FieldDescriptor> fd = build.getDescriptorForType().getFields().stream().filter(d -> StringUtils.equals(d.getName(), attrName)).findFirst();
|
FieldDescriptor fieldId = build.getDescriptorForType().findFieldByName(attrName);
|
||||||
if (fd.isEmpty()) {
|
build.setField(fieldId, vals);
|
||||||
throw new RuntimeException(String.format("未找到对应的属性 build:[%s] attrName:[%s],val:[%s]", build.getClass().getName(), attrName, vals));
|
|
||||||
}
|
|
||||||
FieldDescriptor ffdd = fd.get();
|
|
||||||
build.setField(ffdd, vals);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void copyAttr(GeneratedMessageV3.Builder build, String attrName, Object... vals) {
|
public static void copyAttr(GeneratedMessageV3.Builder build, String attrName, Object... vals) {
|
||||||
|
@ -5,7 +5,6 @@ import club.joylink.xiannccda.ats.message.convertor.DeviceStatusConvertor;
|
|||||||
import club.joylink.xiannccda.ats.message.line3.device.DeviceStatus;
|
import club.joylink.xiannccda.ats.message.line3.device.DeviceStatus;
|
||||||
import club.joylink.xiannccda.ats.message.line3.device.DeviceType;
|
import club.joylink.xiannccda.ats.message.line3.device.DeviceType;
|
||||||
import club.joylink.xiannccda.ats.message.line3.rep.EntityParseUtil.ReadData;
|
import club.joylink.xiannccda.ats.message.line3.rep.EntityParseUtil.ReadData;
|
||||||
import club.joylink.xiannccda.dto.protos.DeviceStatusProto;
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.protobuf.GeneratedMessageV3;
|
import com.google.protobuf.GeneratedMessageV3;
|
||||||
import com.google.protobuf.GeneratedMessageV3.Builder;
|
import com.google.protobuf.GeneratedMessageV3.Builder;
|
||||||
|
@ -83,22 +83,6 @@ public class DeviceStatusChangeResponse extends MessageResponse {
|
|||||||
this.spare = buf.readInt();
|
this.spare = buf.readInt();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
GeneratedMessageV3.Builder rtuBuild = DeviceStatusProto.Rtu.newBuilder();
|
|
||||||
|
|
||||||
// rtuBuild.setId("asd");
|
|
||||||
rtuBuild.getDescriptorForType().getFields().forEach(d -> System.out.println(d.getName()));
|
|
||||||
Optional<FieldDescriptor> fd = rtuBuild.getDescriptorForType().getFields().stream().filter(d -> StringUtils.equals(d.getName(), "id")).findFirst();
|
|
||||||
if (fd.isEmpty()) {
|
|
||||||
|
|
||||||
}
|
|
||||||
FieldDescriptor ffdd = fd.get();
|
|
||||||
rtuBuild.setField(ffdd, "asdfasdf");
|
|
||||||
|
|
||||||
System.out.println(rtuBuild.getAllFields());
|
|
||||||
System.out.println(rtuBuild.getClass().getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Builder> generateProto() throws Exception {
|
public List<Builder> generateProto() throws Exception {
|
||||||
GeneratedMessageV3.Builder builder = DeviceStatusConvertor.convert(this.type, this.deviceStatus);
|
GeneratedMessageV3.Builder builder = DeviceStatusConvertor.convert(this.type, this.deviceStatus);
|
||||||
|
@ -5,80 +5,119 @@ import club.joylink.xiannccda.ats.message.convertor.DeviceStatusConvertor;
|
|||||||
import club.joylink.xiannccda.ats.message.line3.DateTimeUtil;
|
import club.joylink.xiannccda.ats.message.line3.DateTimeUtil;
|
||||||
import club.joylink.xiannccda.ats.message.line3.MessageCons;
|
import club.joylink.xiannccda.ats.message.line3.MessageCons;
|
||||||
import club.joylink.xiannccda.ats.message.line3.device.DeviceStatus;
|
import club.joylink.xiannccda.ats.message.line3.device.DeviceStatus;
|
||||||
import club.joylink.xiannccda.ats.message.line3.device.DeviceStatus.TRAIN_MODE;
|
|
||||||
import club.joylink.xiannccda.ats.message.line3.device.DeviceType;
|
import club.joylink.xiannccda.ats.message.line3.device.DeviceType;
|
||||||
import club.joylink.xiannccda.dto.protos.DeviceStatusProto;
|
import club.joylink.xiannccda.dto.protos.DeviceStatusProto;
|
||||||
import club.joylink.xiannccda.dto.protos.DeviceStatusProto.TrainMode;
|
|
||||||
import club.joylink.xiannccda.dto.protos.TrainProto;
|
import club.joylink.xiannccda.dto.protos.TrainProto;
|
||||||
import club.joylink.xiannccda.dto.protos.TrainProto.NccWindow;
|
import club.joylink.xiannccda.dto.protos.TrainProto.NccWindow;
|
||||||
import club.joylink.xiannccda.dto.protos.TrainProto.TrainInfo;
|
import club.joylink.xiannccda.dto.protos.TrainProto.TrainInfo;
|
||||||
import com.google.common.base.CaseFormat;
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.protobuf.GeneratedMessageV3.Builder;
|
import com.google.protobuf.GeneratedMessageV3.Builder;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.time.ZoneOffset;
|
import java.time.ZoneOffset;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.Setter;
|
import lombok.Setter;
|
||||||
|
|
||||||
/** 2.7.10 列车信息更新消息[增加/更新] */
|
/**
|
||||||
|
* 2.7.10 列车信息更新消息[增加/更新]
|
||||||
|
*/
|
||||||
@Getter
|
@Getter
|
||||||
@Setter
|
@Setter
|
||||||
public class TrainIndicationUpdateResponse extends MessageResponse {
|
public class TrainIndicationUpdateResponse extends MessageResponse {
|
||||||
|
|
||||||
/** 线路号(2) */
|
/**
|
||||||
|
* 线路号(2)
|
||||||
|
*/
|
||||||
private Short lineId;
|
private Short lineId;
|
||||||
/**
|
/**
|
||||||
* 车次号变化状态(2) 0x01:增加<br>
|
* 车次号变化状态(2) 0x01:增加<br> 0x02:更新 <br> true-更新,false-增加
|
||||||
* 0x02:更新 <br>
|
|
||||||
* true-更新,false-增加
|
|
||||||
*/
|
*/
|
||||||
private Boolean type;
|
private Boolean type;
|
||||||
/** 集中站站号(2) */
|
/**
|
||||||
|
* 集中站站号(2)
|
||||||
|
*/
|
||||||
private Short rtuId;
|
private Short rtuId;
|
||||||
/** NCC车次窗编号(2) */
|
/**
|
||||||
|
* NCC车次窗编号(2)
|
||||||
|
*/
|
||||||
private Short nccWindow;
|
private Short nccWindow;
|
||||||
/** 列车在车次窗中的位置(1) */
|
/**
|
||||||
|
* 列车在车次窗中的位置(1)
|
||||||
|
*/
|
||||||
private Byte nccWindowOffset;
|
private Byte nccWindowOffset;
|
||||||
/** 列车所在的设备的类型(2) */
|
/**
|
||||||
|
* 列车所在的设备的类型(2)
|
||||||
|
*/
|
||||||
private DeviceType devType;
|
private DeviceType devType;
|
||||||
/** 列车所在的设备的名称(24) */
|
/**
|
||||||
|
* 列车所在的设备的名称(24)
|
||||||
|
*/
|
||||||
private String devName;
|
private String devName;
|
||||||
/** 列车标示号,全线唯一(若无法提供,缺省值为0)(10) */
|
/**
|
||||||
|
* 列车标示号,全线唯一(若无法提供,缺省值为0)(10)
|
||||||
|
*/
|
||||||
private String trainIndex;
|
private String trainIndex;
|
||||||
/** 列车编组号(9) */
|
/**
|
||||||
|
* 列车编组号(9)
|
||||||
|
*/
|
||||||
private String groupId;
|
private String groupId;
|
||||||
/** 表号(9) */
|
/**
|
||||||
|
* 表号(9)
|
||||||
|
*/
|
||||||
private String trainId;
|
private String trainId;
|
||||||
/** 车次号(12) */
|
/**
|
||||||
|
* 车次号(12)
|
||||||
|
*/
|
||||||
private String globalId;
|
private String globalId;
|
||||||
/** 目的地号(4) */
|
/**
|
||||||
|
* 目的地号(4)
|
||||||
|
*/
|
||||||
private Integer destinationId;
|
private Integer destinationId;
|
||||||
/** 编组数量(1) */
|
/**
|
||||||
|
* 编组数量(1)
|
||||||
|
*/
|
||||||
private byte rollingStock;
|
private byte rollingStock;
|
||||||
/** 司机号(13) */
|
/**
|
||||||
|
* 司机号(13)
|
||||||
|
*/
|
||||||
private String driverId;
|
private String driverId;
|
||||||
/** 运行路径号(若无法提供,缺省值为0)(2) */
|
/**
|
||||||
|
* 运行路径号(若无法提供,缺省值为0)(2)
|
||||||
|
*/
|
||||||
private Short routeId;
|
private Short routeId;
|
||||||
/** 计划偏离时间(4) */
|
/**
|
||||||
|
* 计划偏离时间(4)
|
||||||
|
*/
|
||||||
private Integer optTime;
|
private Integer optTime;
|
||||||
/** 列车状态,见附录6.3.14列车状态定义(4) */
|
/**
|
||||||
|
* 列车状态,见附录6.3.14列车状态定义(4)
|
||||||
|
*/
|
||||||
private Integer mode;
|
private Integer mode;
|
||||||
/** 列车到点(7) */
|
/**
|
||||||
|
* 列车到点(7)
|
||||||
|
*/
|
||||||
private LocalDateTime arriveTime;
|
private LocalDateTime arriveTime;
|
||||||
/** 列车发点(7) */
|
/**
|
||||||
|
* 列车发点(7)
|
||||||
|
*/
|
||||||
private LocalDateTime departTime;
|
private LocalDateTime departTime;
|
||||||
/** 满载率(百分比,例如50,表示满载率为50%)(4) */
|
/**
|
||||||
|
* 满载率(百分比,例如50,表示满载率为50%)(4)
|
||||||
|
*/
|
||||||
private Integer rate;
|
private Integer rate;
|
||||||
/** 速度(KM/H)(1) */
|
/**
|
||||||
|
* 速度(KM/H)(1)
|
||||||
|
*/
|
||||||
private byte speed;
|
private byte speed;
|
||||||
/** 预留(2) */
|
/**
|
||||||
|
* 预留(2)
|
||||||
|
*/
|
||||||
private byte[] reserve = new byte[2];
|
private byte[] reserve = new byte[2];
|
||||||
|
|
||||||
/** 列车是否有 trainMode对应的状态 */
|
/**
|
||||||
|
* 列车是否有 trainMode对应的状态
|
||||||
|
*/
|
||||||
public boolean havingState(final DeviceStatus.TRAIN_MODE trainMode) {
|
public boolean havingState(final DeviceStatus.TRAIN_MODE trainMode) {
|
||||||
return trainMode.is(mode);
|
return trainMode.is(mode);
|
||||||
}
|
}
|
||||||
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,103 @@
|
|||||||
|
package club.joylink.xiannccda.mock.message;
|
||||||
|
|
||||||
|
import club.joylink.xiannccda.ats.message.MessageData;
|
||||||
|
import club.joylink.xiannccda.ats.message.MessageId;
|
||||||
|
import club.joylink.xiannccda.ats.message.collect.DeviceStatusDataRepository;
|
||||||
|
import club.joylink.xiannccda.mock.message.NccMockData.ActionTypeEnum;
|
||||||
|
import club.joylink.xiannccda.mock.message.NccMockData.MsgTypeEnum;
|
||||||
|
import club.joylink.xiannccda.mock.message.NccMockDataService.DataType;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.Wrapper;
|
||||||
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||||
|
import com.google.protobuf.GeneratedMessageV3;
|
||||||
|
import com.google.protobuf.GeneratedMessageV3.Builder;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import org.springframework.boot.ApplicationArguments;
|
||||||
|
import org.springframework.boot.ApplicationRunner;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class MockLoadData implements ApplicationRunner {
|
||||||
|
|
||||||
|
static final ScheduledExecutorService CIRCLE_QUERY_THREAD = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
private final NccMockDataService nccMockDataService;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(ApplicationArguments args) throws Exception {
|
||||||
|
this.nccMockDataService.reset(1);
|
||||||
|
this.loadAllDevice();
|
||||||
|
this.loadAllTrain();
|
||||||
|
|
||||||
|
CIRCLE_QUERY_THREAD.scheduleAtFixedRate(() -> {
|
||||||
|
this.loadDevice();
|
||||||
|
this.loadTrain();
|
||||||
|
}, 1500, 1500, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void loadAllDevice() {
|
||||||
|
Wrapper<NccMockData> wrapper = Wrappers.<NccMockData>lambdaQuery()
|
||||||
|
.eq(NccMockData::getActionType, ActionTypeEnum.ALL.name())
|
||||||
|
.eq(NccMockData::getMsgType, MsgTypeEnum.REAL_TIME.name());
|
||||||
|
List<MessageData> dataList = this.nccMockDataService.loadALLData(wrapper);
|
||||||
|
try {
|
||||||
|
for (MessageData messageData : dataList) {
|
||||||
|
List<GeneratedMessageV3.Builder> builders = messageData.generateProto();
|
||||||
|
DeviceStatusDataRepository.addDeviceStatusDataList("3", builders);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void loadAllTrain() {
|
||||||
|
Wrapper<NccMockData> wrapper = Wrappers.<NccMockData>lambdaQuery()
|
||||||
|
.eq(NccMockData::getActionType, ActionTypeEnum.ALL.name())
|
||||||
|
.eq(NccMockData::getMsgType, MsgTypeEnum.REAL_TIME.name())
|
||||||
|
.eq(NccMockData::getMsgId, MessageId.TRAIN_INDICATION_INIT.name());
|
||||||
|
List<MessageData> dataList = this.nccMockDataService.loadALLData(wrapper);
|
||||||
|
try {
|
||||||
|
|
||||||
|
for (MessageData messageData : dataList) {
|
||||||
|
List<Builder> builders = messageData.generateProto();
|
||||||
|
DeviceStatusDataRepository.addTrain("3", builders);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void loadTrain() {
|
||||||
|
List<MessageData> messageDataList = this.nccMockDataService.getMessageData(DataType.TRAIN);
|
||||||
|
if (CollectionUtils.isEmpty(messageDataList)) {
|
||||||
|
System.out.println("没有新的数据");
|
||||||
|
}
|
||||||
|
for (MessageData messageData : messageDataList) {
|
||||||
|
try {
|
||||||
|
DeviceStatusDataRepository.addTrain("3", messageData.generateProto());
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void loadDevice() {
|
||||||
|
List<MessageData> messageDataList = this.nccMockDataService.getMessageData(DataType.TRAIN);
|
||||||
|
if (CollectionUtils.isEmpty(messageDataList)) {
|
||||||
|
System.out.println("没有新的数据");
|
||||||
|
}
|
||||||
|
for (MessageData messageData : messageDataList) {
|
||||||
|
try {
|
||||||
|
DeviceStatusDataRepository.addDeviceStatusDataList("3", messageData.generateProto());
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -38,6 +38,8 @@ public class NccMockData {
|
|||||||
|
|
||||||
private String actionType;
|
private String actionType;
|
||||||
|
|
||||||
|
private String msgId;
|
||||||
|
|
||||||
public static final String ID = "id";
|
public static final String ID = "id";
|
||||||
|
|
||||||
public static final String MOCK_DATA = "mock_data";
|
public static final String MOCK_DATA = "mock_data";
|
||||||
@ -47,6 +49,8 @@ public class NccMockData {
|
|||||||
public static final String MSG_TYPE = "msg_type";
|
public static final String MSG_TYPE = "msg_type";
|
||||||
public static final String ACTION_TYPE = "action_type";
|
public static final String ACTION_TYPE = "action_type";
|
||||||
|
|
||||||
|
public static final String MSG_ID = "msg_id";
|
||||||
|
|
||||||
public enum MsgTypeEnum {
|
public enum MsgTypeEnum {
|
||||||
REAL_TIME, UNREAL_TIME
|
REAL_TIME, UNREAL_TIME
|
||||||
}
|
}
|
||||||
|
@ -10,12 +10,14 @@ import club.joylink.xiannccda.mock.message.NccMockData.MsgTypeEnum;
|
|||||||
import club.joylink.xiannccda.repository.impl.NccMockDataRepository;
|
import club.joylink.xiannccda.repository.impl.NccMockDataRepository;
|
||||||
import com.alibaba.fastjson2.JSONArray;
|
import com.alibaba.fastjson2.JSONArray;
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.Wrapper;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import lombok.Getter;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
@ -58,19 +60,18 @@ public class NccMockDataService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<MessageData> loadALLData() {
|
public List<MessageData> loadALLData(Wrapper<NccMockData> wrapp) {
|
||||||
List<NccMockData> nccMockData = this.nccMockDataRepository.list(Wrappers.<NccMockData>lambdaQuery()
|
List<NccMockData> nccMockData = this.nccMockDataRepository.list(wrapp);
|
||||||
.eq(NccMockData::getActionType, ActionTypeEnum.ALL.name())
|
|
||||||
.eq(NccMockData::getMsgType, MsgTypeEnum.REAL_TIME.name()));
|
|
||||||
List<MessageData> list = this.parse(nccMockData);
|
List<MessageData> list = this.parse(nccMockData);
|
||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<MessageData> getMessageData() {
|
public List<MessageData> getMessageData(DataType dataType) {
|
||||||
if (this.isLast()) {
|
if (this.isLast()) {
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
List<NccMockData> list = this.findData();
|
List<NccMockData> list = this.findData(dataType);
|
||||||
if (CollectionUtils.isEmpty(list)) {
|
if (CollectionUtils.isEmpty(list)) {
|
||||||
FINISH = true;
|
FINISH = true;
|
||||||
} else {
|
} else {
|
||||||
@ -96,13 +97,31 @@ public class NccMockDataService {
|
|||||||
dataList.add(md);
|
dataList.add(md);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<NccMockData> findData() {
|
private List<NccMockData> findData(DataType dataType) {
|
||||||
QueryWrapper<NccMockData> queryWrapper = Wrappers.query();
|
QueryWrapper<NccMockData> queryWrapper = Wrappers.query();
|
||||||
queryWrapper.gt(NccMockData.ID, LAST_ID);
|
queryWrapper.gt(NccMockData.ID, LAST_ID);
|
||||||
queryWrapper.eq(NccMockData.MSG_TYPE, MsgTypeEnum.REAL_TIME.name());
|
queryWrapper.eq(NccMockData.MSG_TYPE, MsgTypeEnum.REAL_TIME.name());
|
||||||
queryWrapper.eq(NccMockData.ACTION_TYPE, ActionTypeEnum.UPDATES.name());
|
queryWrapper.eq(NccMockData.ACTION_TYPE, ActionTypeEnum.UPDATES.name());
|
||||||
|
if (!CollectionUtils.isEmpty(dataType.getTypeList())) {
|
||||||
|
queryWrapper.in(NccMockData.MSG_ID, dataType.getTypeList());
|
||||||
|
}
|
||||||
queryWrapper.last(String.format(" limit %s", PAGE_SIZE));
|
queryWrapper.last(String.format(" limit %s", PAGE_SIZE));
|
||||||
|
queryWrapper.orderByAsc(NccMockData.ID);
|
||||||
List<NccMockData> list = this.nccMockDataRepository.list(queryWrapper);
|
List<NccMockData> list = this.nccMockDataRepository.list(queryWrapper);
|
||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public enum DataType {
|
||||||
|
DEVICE(List.of("DEVICE_STATUS_CHANGE", "SIGNAL_ROUTE_STATUS", "DEVICE_STATUS_CHANGE")),
|
||||||
|
TRAIN(List.of("TRAIN_RECORD", "TRAIN_INDICATION_UPDATE", "TRAIN_INDICATION_REMOVE", "TRAIN_BLOCK_INFO")),
|
||||||
|
ALL(List.of());
|
||||||
|
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
List<String> typeList;
|
||||||
|
|
||||||
|
DataType(List<String> typeList) {
|
||||||
|
this.typeList = typeList;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,7 @@ public class MockMessageDataHandle {
|
|||||||
NccMockData td = new NccMockData();
|
NccMockData td = new NccMockData();
|
||||||
td.setMockData(JSON.toJSONString(data));
|
td.setMockData(JSON.toJSONString(data));
|
||||||
td.setMockReceiveTime(localDateTime);
|
td.setMockReceiveTime(localDateTime);
|
||||||
|
td.setMsgId(data.getMsgId().name());
|
||||||
this.setMsgType(td, data);
|
this.setMsgType(td, data);
|
||||||
iNccMockDataRepository.save(td);
|
iNccMockDataRepository.save(td);
|
||||||
}
|
}
|
||||||
|
@ -1,164 +0,0 @@
|
|||||||
package club.joylink.xiannccda.mock.message.ws;
|
|
||||||
|
|
||||||
import club.joylink.xiannccda.ats.message.MessageData;
|
|
||||||
import club.joylink.xiannccda.ats.message.collect.DeviceStatusDataOperate;
|
|
||||||
import club.joylink.xiannccda.ats.message.collect.DeviceStatusDataRepository;
|
|
||||||
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineMessage;
|
|
||||||
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineNetMessage;
|
|
||||||
import club.joylink.xiannccda.mock.message.NccMockDataService;
|
|
||||||
import club.joylink.xiannccda.ws.IMessageServer;
|
|
||||||
import club.joylink.xiannccda.ws.TopicMessage;
|
|
||||||
import club.joylink.xiannccda.ws.WsMessageServerManager;
|
|
||||||
import com.alibaba.fastjson2.JSON;
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import com.google.common.collect.Maps;
|
|
||||||
import com.google.common.collect.Sets;
|
|
||||||
import com.google.protobuf.Descriptors.FieldDescriptor.Type;
|
|
||||||
import com.google.protobuf.GeneratedMessageV3;
|
|
||||||
import com.google.protobuf.GeneratedMessageV3.Builder;
|
|
||||||
import com.google.protobuf.Message;
|
|
||||||
import jakarta.annotation.PostConstruct;
|
|
||||||
import java.text.MessageFormat;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Properties;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.function.Function;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
import org.springframework.util.AntPathMatcher;
|
|
||||||
import org.springframework.util.CollectionUtils;
|
|
||||||
import org.springframework.util.PropertyPlaceholderHelper;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
@Slf4j
|
|
||||||
public class MockDeviceMessageServer implements IMessageServer {
|
|
||||||
|
|
||||||
|
|
||||||
private final WsMessageServerManager serverManager;
|
|
||||||
private final NccMockDataService nccMockDataService;
|
|
||||||
|
|
||||||
private final static String SUBSCRIPTION_PATH = "/queue/line/{lineId}";
|
|
||||||
|
|
||||||
private final Set<String> lineIdSet = Sets.newHashSet();
|
|
||||||
|
|
||||||
private static PropertyPlaceholderHelper helper = new PropertyPlaceholderHelper(
|
|
||||||
"{", "}", ":", false);
|
|
||||||
|
|
||||||
|
|
||||||
@PostConstruct
|
|
||||||
public void init() {
|
|
||||||
|
|
||||||
List<MessageData> dataList = this.nccMockDataService.loadALLData();
|
|
||||||
try {
|
|
||||||
for (MessageData messageData : dataList) {
|
|
||||||
List<GeneratedMessageV3.Builder> builders = messageData.generateProto();
|
|
||||||
DeviceStatusDataRepository.addDeviceStatusDataList("3", builders);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
this.serverManager.registerMessageServer(this);
|
|
||||||
this.nccMockDataService.reset(1);
|
|
||||||
this.circleQueryMockData();
|
|
||||||
}
|
|
||||||
|
|
||||||
static final ScheduledExecutorService CIRCLE_QUERY_THREAD = Executors.newSingleThreadScheduledExecutor();
|
|
||||||
|
|
||||||
private void circleQueryMockData() {
|
|
||||||
CIRCLE_QUERY_THREAD.scheduleAtFixedRate(() -> {
|
|
||||||
if (lineIdSet.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
List<MessageData> messageDataList = this.nccMockDataService.getMessageData();
|
|
||||||
if (CollectionUtils.isEmpty(messageDataList)) {
|
|
||||||
System.out.println("没有新的数据");
|
|
||||||
}
|
|
||||||
for (MessageData messageData : messageDataList) {
|
|
||||||
try {
|
|
||||||
DeviceStatusDataRepository.addDeviceStatusDataList("3", messageData.generateProto());
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, this.getInterval(), this.getInterval(), TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getDestinationPattern() {
|
|
||||||
return SUBSCRIPTION_PATH;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Object onSubscription(String destination, Map<String, String> paramMap) {
|
|
||||||
String lineId = paramMap.get("lineId");
|
|
||||||
lineIdSet.add(lineId);
|
|
||||||
|
|
||||||
log.info("线路lineId={}订阅,发布全量数据", lineId);
|
|
||||||
DeviceStatusDataRepository.getDeviceStatusData(lineId).getAllDeviceMap();
|
|
||||||
WsLineMessage.Builder builder = WsLineMessage.newBuilder();
|
|
||||||
fillBuilderFunction(fun -> {
|
|
||||||
Map<String, Builder> builderMap = DeviceStatusDataRepository.getDeviceStatusData(lineId).getAllDeviceMap().get(fun);
|
|
||||||
if (Objects.isNull(builderMap)) {
|
|
||||||
return Map.of();
|
|
||||||
}
|
|
||||||
Map<String, Message> messageMap = Maps.newHashMap();
|
|
||||||
builderMap.forEach((k, v) -> messageMap.put(k, v.build()));
|
|
||||||
return messageMap;
|
|
||||||
}, builder);
|
|
||||||
WsLineMessage buildMsg = builder.build();
|
|
||||||
return buildMsg.toByteArray();
|
|
||||||
|
|
||||||
// return new TopicMessage(destination, buildMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getInterval() {
|
|
||||||
return 1500;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<TopicMessage> onTick() {
|
|
||||||
if (this.lineIdSet.isEmpty()) {
|
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
Properties properties = new Properties();
|
|
||||||
List<TopicMessage> messages = Lists.newArrayList();
|
|
||||||
for (String lineId : this.lineIdSet) {
|
|
||||||
WsLineMessage.Builder msg = WsLineMessage.newBuilder();
|
|
||||||
fillBuilderFunction((field) -> DeviceStatusDataRepository.getDeviceStatusData(lineId).getStatusVOMap().get(field), msg);
|
|
||||||
DeviceStatusDataOperate.clearStatusVOMap(DeviceStatusDataRepository.getDeviceStatusData(lineId));
|
|
||||||
properties.put("lineId", lineId);
|
|
||||||
String destination = helper.replacePlaceholders(SUBSCRIPTION_PATH, properties);
|
|
||||||
messages.add(new TopicMessage(destination, msg.build().toByteArray()));
|
|
||||||
properties.clear();
|
|
||||||
|
|
||||||
}
|
|
||||||
return messages;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public void fillBuilderFunction(Function<String, Map<String, Message>> compareFun, GeneratedMessageV3.Builder builder) {
|
|
||||||
// 消息体字段列表
|
|
||||||
builder.getDescriptorForType().getFields().stream()
|
|
||||||
.filter(f -> f.getType().equals(Type.MESSAGE))
|
|
||||||
.forEach(
|
|
||||||
field -> {
|
|
||||||
String fieldType = field.getMessageType().getName(); // 字段类型
|
|
||||||
Map<String, Message> allDeviceMap = compareFun.apply(fieldType);
|
|
||||||
if (!CollectionUtils.isEmpty(allDeviceMap)) {
|
|
||||||
builder.setField(field, new ArrayList<>(allDeviceMap.values()));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
@ -6,13 +6,17 @@ import club.joylink.xiannccda.ats.message.collect.DeviceStatusDataRepository;
|
|||||||
import club.joylink.xiannccda.ats.message.collect.convertor.LineNetTrainInitConvertor;
|
import club.joylink.xiannccda.ats.message.collect.convertor.LineNetTrainInitConvertor;
|
||||||
import club.joylink.xiannccda.ats.message.collect.convertor.LineNetTrainRecordConvertor;
|
import club.joylink.xiannccda.ats.message.collect.convertor.LineNetTrainRecordConvertor;
|
||||||
import club.joylink.xiannccda.ws.IMessageServer;
|
import club.joylink.xiannccda.ws.IMessageServer;
|
||||||
|
import club.joylink.xiannccda.ws.LineDeviceMessageServer;
|
||||||
import club.joylink.xiannccda.ws.LineNetMessageServer;
|
import club.joylink.xiannccda.ws.LineNetMessageServer;
|
||||||
|
import club.joylink.xiannccda.ws.LineTrainMessageServer;
|
||||||
import club.joylink.xiannccda.ws.WsMessageServerManager;
|
import club.joylink.xiannccda.ws.WsMessageServerManager;
|
||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
/** 线路设备信息更新 */
|
/**
|
||||||
|
* 线路设备信息更新
|
||||||
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
public class LineDeviceStatusService {
|
public class LineDeviceStatusService {
|
||||||
@ -38,5 +42,7 @@ public class LineDeviceStatusService {
|
|||||||
// websocket发送服务
|
// websocket发送服务
|
||||||
IMessageServer iMessageServer = new LineNetMessageServer(deviceStatusData);
|
IMessageServer iMessageServer = new LineNetMessageServer(deviceStatusData);
|
||||||
wsMessageServerManager.registerMessageServer(iMessageServer);
|
wsMessageServerManager.registerMessageServer(iMessageServer);
|
||||||
|
wsMessageServerManager.registerMessageServer(new LineTrainMessageServer());
|
||||||
|
wsMessageServerManager.registerMessageServer(new LineDeviceMessageServer());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,12 +2,23 @@ package club.joylink.xiannccda.ws;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
|
import org.springframework.util.PropertyPlaceholderHelper;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 消息服务
|
* 消息服务
|
||||||
*/
|
*/
|
||||||
public interface IMessageServer {
|
public interface IMessageServer {
|
||||||
|
|
||||||
|
final static PropertyPlaceholderHelper PLACEHOLDER_HELPER = new PropertyPlaceholderHelper(
|
||||||
|
"{", "}", ":", false);
|
||||||
|
|
||||||
|
default String createPlace(String name, String val) {
|
||||||
|
Properties properties = new Properties();
|
||||||
|
properties.put(name, val);
|
||||||
|
return PLACEHOLDER_HELPER.replacePlaceholders(this.getDestinationPattern(), properties);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 服务路径pattern
|
* 服务路径pattern
|
||||||
*
|
*
|
||||||
|
@ -0,0 +1,94 @@
|
|||||||
|
package club.joylink.xiannccda.ws;
|
||||||
|
|
||||||
|
import club.joylink.xiannccda.ats.message.collect.DeviceStatusData;
|
||||||
|
import club.joylink.xiannccda.ats.message.collect.DeviceStatusDataOperate;
|
||||||
|
import club.joylink.xiannccda.ats.message.collect.DeviceStatusDataRepository;
|
||||||
|
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineMessage;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
import com.google.protobuf.Descriptors.FieldDescriptor.Type;
|
||||||
|
import com.google.protobuf.GeneratedMessageV3;
|
||||||
|
import com.google.protobuf.GeneratedMessageV3.Builder;
|
||||||
|
import com.google.protobuf.Message;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class LineDeviceMessageServer implements IMessageServer {
|
||||||
|
|
||||||
|
private final static String SUBSCRIPTION_PATH = "/queue/line/device/{lineId}";
|
||||||
|
|
||||||
|
|
||||||
|
private Set<String> lineIdSet = Sets.newHashSet();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDestinationPattern() {
|
||||||
|
return SUBSCRIPTION_PATH;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object onSubscription(String destination, Map<String, String> paramMap) {
|
||||||
|
String lineId = paramMap.get("lineId");
|
||||||
|
this.lineIdSet.add(lineId);
|
||||||
|
DeviceStatusData statusData = DeviceStatusDataRepository.getDeviceStatusData(lineId);
|
||||||
|
log.info("线路lineId={}订阅,发布全量数据", lineId);
|
||||||
|
WsLineMessage.Builder builder = WsLineMessage.newBuilder();
|
||||||
|
fillBuilderFunction(fun -> {
|
||||||
|
Map<String, Builder> builderMap = statusData.getAllDeviceMap().get(fun);
|
||||||
|
if (Objects.isNull(builderMap)) {
|
||||||
|
return Map.of();
|
||||||
|
}
|
||||||
|
Map<String, Message> messageMap = Maps.newHashMap();
|
||||||
|
builderMap.forEach((k, v) -> messageMap.put(k, v.build()));
|
||||||
|
return messageMap;
|
||||||
|
}, builder);
|
||||||
|
WsLineMessage buildMsg = builder.build();
|
||||||
|
return buildMsg.toByteArray();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getInterval() {
|
||||||
|
return 1500;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<TopicMessage> onTick() {
|
||||||
|
if (this.lineIdSet.isEmpty()) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
List<TopicMessage> messages = Lists.newArrayList();
|
||||||
|
for (String lineId : this.lineIdSet) {
|
||||||
|
WsLineMessage.Builder msg = WsLineMessage.newBuilder();
|
||||||
|
DeviceStatusData statusData = DeviceStatusDataRepository.getDeviceStatusData(lineId);
|
||||||
|
fillBuilderFunction((field) -> statusData.getStatusVOMap().get(field), msg);
|
||||||
|
DeviceStatusDataOperate.clearStatusVOMap(DeviceStatusDataRepository.getDeviceStatusData(lineId));
|
||||||
|
String destination = this.createPlace("lineId", lineId);
|
||||||
|
messages.add(new TopicMessage(destination, msg.build().toByteArray()));
|
||||||
|
}
|
||||||
|
return messages;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void fillBuilderFunction(Function<String, Map<String, Message>> compareFun, GeneratedMessageV3.Builder builder) {
|
||||||
|
// 消息体字段列表
|
||||||
|
builder.getDescriptorForType().getFields().stream()
|
||||||
|
.filter(f -> f.getType().equals(Type.MESSAGE))
|
||||||
|
.forEach(
|
||||||
|
field -> {
|
||||||
|
String fieldType = field.getMessageType().getName(); // 字段类型
|
||||||
|
Map<String, Message> allDeviceMap = compareFun.apply(fieldType);
|
||||||
|
if (!CollectionUtils.isEmpty(allDeviceMap)) {
|
||||||
|
builder.setField(field, new ArrayList<>(allDeviceMap.values()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,68 @@
|
|||||||
|
package club.joylink.xiannccda.ws;
|
||||||
|
|
||||||
|
import club.joylink.xiannccda.ats.message.collect.DeviceStatusDataRepository;
|
||||||
|
import club.joylink.xiannccda.ats.message.collect.TrainInfoData;
|
||||||
|
import club.joylink.xiannccda.dto.protos.TrainProto.TrainInfo;
|
||||||
|
import club.joylink.xiannccda.dto.protos.TrainProto.TrainInfo.Builder;
|
||||||
|
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineTrainMessage;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class LineTrainMessageServer implements IMessageServer {
|
||||||
|
|
||||||
|
private final static String SUBSCRIPTION_PATH = "/queue/line/train/{lineId}";
|
||||||
|
private final Set<String> lineIdSet = Sets.newHashSet();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDestinationPattern() {
|
||||||
|
return SUBSCRIPTION_PATH;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object onSubscription(String destination, Map<String, String> paramMap) {
|
||||||
|
String lineId = paramMap.get("lineId");
|
||||||
|
lineIdSet.add(lineId);
|
||||||
|
log.info("线路lineId={}订阅车辆信息,发布全量数据", lineId);
|
||||||
|
Collection<Builder> trains = DeviceStatusDataRepository.getAllTrainInfo(lineId).getAllTrain();
|
||||||
|
List<TrainInfo> trainMsg = trains.stream().map(TrainInfo.Builder::build).toList();
|
||||||
|
WsLineTrainMessage.Builder trainMessage = WsLineTrainMessage.newBuilder();
|
||||||
|
trainMessage.addAllTrainInfo(trainMsg);
|
||||||
|
return trainMessage.build().toByteArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getInterval() {
|
||||||
|
return 1500;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<TopicMessage> onTick() {
|
||||||
|
if (this.lineIdSet.isEmpty()) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
List<TopicMessage> messages = Lists.newArrayList();
|
||||||
|
for (String lineId : this.lineIdSet) {
|
||||||
|
|
||||||
|
TrainInfoData trainInfoData = DeviceStatusDataRepository.getAllTrainInfo(lineId);
|
||||||
|
|
||||||
|
WsLineTrainMessage.Builder trainMessage = WsLineTrainMessage.newBuilder();
|
||||||
|
for (TrainInfo.Builder trainInfo : trainInfoData.getAllTrain()) {
|
||||||
|
trainInfoData.fillBlockOrRecordOrRemove(trainInfo, "TrainBlock", "block");
|
||||||
|
trainInfoData.fillBlockOrRecordOrRemove(trainInfo, "TrainRemove", "remove");
|
||||||
|
trainInfoData.fillBlockOrRecordOrRemove(trainInfo, "TrainRecord", "record");
|
||||||
|
trainInfoData.reput(trainInfo);
|
||||||
|
trainMessage.addTrainInfo(trainInfo);
|
||||||
|
}
|
||||||
|
String destination = this.createPlace("lineId", lineId);
|
||||||
|
messages.add(new TopicMessage(destination, trainMessage.build().toByteArray()));
|
||||||
|
}
|
||||||
|
return messages;
|
||||||
|
}
|
||||||
|
}
|
@ -17,6 +17,8 @@ import club.joylink.xiannccda.ats.message.line3.rep.DeviceStatusChangeResponse;
|
|||||||
import club.joylink.xiannccda.dto.protos.DeviceStatusProto;
|
import club.joylink.xiannccda.dto.protos.DeviceStatusProto;
|
||||||
import club.joylink.xiannccda.mock.message.MockAppContext;
|
import club.joylink.xiannccda.mock.message.MockAppContext;
|
||||||
import club.joylink.xiannccda.mock.message.NccMockData;
|
import club.joylink.xiannccda.mock.message.NccMockData;
|
||||||
|
import club.joylink.xiannccda.mock.message.NccMockData.ActionTypeEnum;
|
||||||
|
import club.joylink.xiannccda.mock.message.NccMockData.MsgTypeEnum;
|
||||||
import club.joylink.xiannccda.mock.message.NccMockDataService;
|
import club.joylink.xiannccda.mock.message.NccMockDataService;
|
||||||
import club.joylink.xiannccda.repository.INccMockDataRepository;
|
import club.joylink.xiannccda.repository.INccMockDataRepository;
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
@ -121,7 +123,11 @@ public class DeviceStatusTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void findDeviceTypeEnum() throws Exception {
|
public void findDeviceTypeEnum() throws Exception {
|
||||||
List<MessageData> messageData = this.dataService.loadALLData();
|
Wrapper<NccMockData> wrapper = Wrappers.<NccMockData>lambdaQuery()
|
||||||
|
.eq(NccMockData::getActionType, ActionTypeEnum.ALL.name())
|
||||||
|
.eq(NccMockData::getMsgType, MsgTypeEnum.REAL_TIME.name());
|
||||||
|
|
||||||
|
List<MessageData> messageData = this.dataService.loadALLData(wrapper);
|
||||||
for (MessageData messageDatum : messageData) {
|
for (MessageData messageDatum : messageData) {
|
||||||
List<? extends GeneratedMessageV3.Builder> msgs = messageDatum.generateProto();
|
List<? extends GeneratedMessageV3.Builder> msgs = messageDatum.generateProto();
|
||||||
System.out.println(messageDatum);
|
System.out.println(messageDatum);
|
||||||
|
209
src/test/java/club/joylink/xiannccda/device/TrainDataTest.java
Normal file
209
src/test/java/club/joylink/xiannccda/device/TrainDataTest.java
Normal file
@ -0,0 +1,209 @@
|
|||||||
|
package club.joylink.xiannccda.device;
|
||||||
|
|
||||||
|
import club.joylink.xiannccda.ats.message.MessageData;
|
||||||
|
import club.joylink.xiannccda.ats.message.MessageId;
|
||||||
|
import club.joylink.xiannccda.ats.message.convertor.DeviceStatusConvertor;
|
||||||
|
import club.joylink.xiannccda.ats.message.line3.device.DeviceStatus;
|
||||||
|
import club.joylink.xiannccda.ats.message.line3.device.DeviceStatus.PLATFORM;
|
||||||
|
import club.joylink.xiannccda.ats.message.line3.device.DeviceStatus.SIGNAL;
|
||||||
|
import club.joylink.xiannccda.ats.message.line3.device.DeviceStatus.STATION;
|
||||||
|
import club.joylink.xiannccda.ats.message.line3.device.DeviceStatus.SWITCH;
|
||||||
|
import club.joylink.xiannccda.ats.message.line3.device.DeviceStatus.TRAIN_MODE;
|
||||||
|
import club.joylink.xiannccda.ats.message.line3.device.DeviceType;
|
||||||
|
import club.joylink.xiannccda.ats.message.line3.rep.DeviceStatusBitmapResponse;
|
||||||
|
import club.joylink.xiannccda.ats.message.line3.rep.DeviceStatusBitmapResponse.DeviceEntity;
|
||||||
|
import club.joylink.xiannccda.ats.message.line3.rep.DeviceStatusBitmapResponse.DeviceTypeEntity;
|
||||||
|
import club.joylink.xiannccda.ats.message.line3.rep.DeviceStatusChangeResponse;
|
||||||
|
import club.joylink.xiannccda.ats.message.line3.rep.TrainIndicationUpdateResponse;
|
||||||
|
import club.joylink.xiannccda.dto.protos.DeviceStatusProto;
|
||||||
|
import club.joylink.xiannccda.mock.message.MockAppContext;
|
||||||
|
import club.joylink.xiannccda.mock.message.NccMockData;
|
||||||
|
import club.joylink.xiannccda.mock.message.NccMockData.ActionTypeEnum;
|
||||||
|
import club.joylink.xiannccda.mock.message.NccMockData.MsgTypeEnum;
|
||||||
|
import club.joylink.xiannccda.mock.message.NccMockDataService;
|
||||||
|
import club.joylink.xiannccda.repository.INccMockDataRepository;
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.Wrapper;
|
||||||
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||||
|
import com.google.protobuf.GeneratedMessageV3;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.List;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
|
||||||
|
@SpringBootTest
|
||||||
|
public class TrainDataTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void trainRemove() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void trainUpdate() throws InterruptedException {
|
||||||
|
|
||||||
|
TrainIndicationUpdateResponse updateResponse = new TrainIndicationUpdateResponse();
|
||||||
|
updateResponse.setMsgId(MessageId.TRAIN_INDICATION_UPDATE);
|
||||||
|
updateResponse.setLineId((short) 3);
|
||||||
|
/**
|
||||||
|
* 车次号变化状态(2) 0x01:增加<br> 0x02:更新 <br> true-更新,false-增加
|
||||||
|
*/
|
||||||
|
updateResponse.setType(false);
|
||||||
|
/**
|
||||||
|
* 集中站站号(2)
|
||||||
|
*/
|
||||||
|
updateResponse.setRtuId((short) 3);
|
||||||
|
/**
|
||||||
|
* NCC车次窗编号(2)
|
||||||
|
*/
|
||||||
|
updateResponse.setNccWindow((short) 2);
|
||||||
|
/**
|
||||||
|
* 列车在车次窗中的位置(1)
|
||||||
|
*/
|
||||||
|
updateResponse.setNccWindowOffset((byte) 3);
|
||||||
|
/**
|
||||||
|
* 列车所在的设备的类型(2)
|
||||||
|
*/
|
||||||
|
updateResponse.setDevType(DeviceType.DEVICE_TYPE_SIGNAL);
|
||||||
|
/**
|
||||||
|
* 列车所在的设备的名称(24)
|
||||||
|
*/
|
||||||
|
updateResponse.setDevName("");
|
||||||
|
/**
|
||||||
|
* 列车标示号,全线唯一(若无法提供,缺省值为0)(10)
|
||||||
|
*/
|
||||||
|
updateResponse.setTrainIndex("");
|
||||||
|
/**
|
||||||
|
* 列车编组号(9)
|
||||||
|
*/
|
||||||
|
updateResponse.setGroupId("");
|
||||||
|
/**
|
||||||
|
* 表号(9)
|
||||||
|
*/
|
||||||
|
updateResponse.setTrainId("");
|
||||||
|
/**
|
||||||
|
* 车次号(12)
|
||||||
|
*/
|
||||||
|
updateResponse.setGlobalId("");
|
||||||
|
/**
|
||||||
|
* 目的地号(4)
|
||||||
|
*/
|
||||||
|
updateResponse.setDestinationId(1);
|
||||||
|
/**
|
||||||
|
* 编组数量(1)
|
||||||
|
*/
|
||||||
|
updateResponse.setRollingStock((byte) 2);
|
||||||
|
/**
|
||||||
|
* 司机号(13)
|
||||||
|
*/
|
||||||
|
updateResponse.setDriverId("");
|
||||||
|
/**
|
||||||
|
* 运行路径号(若无法提供,缺省值为0)(2)
|
||||||
|
*/
|
||||||
|
updateResponse.setRouteId((short) 1);
|
||||||
|
/**
|
||||||
|
* 计划偏离时间(4)
|
||||||
|
*/
|
||||||
|
updateResponse.setOptTime(123);
|
||||||
|
/**
|
||||||
|
* 列车状态,见附录6.3.14列车状态定义(4)
|
||||||
|
*/
|
||||||
|
updateResponse.setMode(TRAIN_MODE.IP_MODE_TRAIN_ATP_CUT.getValue());
|
||||||
|
/**
|
||||||
|
* 列车到点(7)
|
||||||
|
*/
|
||||||
|
updateResponse.setArriveTime(LocalDateTime.now());
|
||||||
|
/**
|
||||||
|
* 列车发点(7)
|
||||||
|
*/
|
||||||
|
updateResponse.setDepartTime(LocalDateTime.now());
|
||||||
|
/**
|
||||||
|
* 满载率(百分比,例如50,表示满载率为50%)(4)
|
||||||
|
*/
|
||||||
|
updateResponse.setRate(12);
|
||||||
|
/**
|
||||||
|
* 速度(KM/H)(1)
|
||||||
|
*/
|
||||||
|
updateResponse.setSpeed((byte) 123);
|
||||||
|
MockAppContext.publish(List.of(updateResponse));
|
||||||
|
|
||||||
|
Thread.sleep(10000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void allDevice() throws InterruptedException {
|
||||||
|
DeviceStatusBitmapResponse deviceStatusBitmapResponse = new DeviceStatusBitmapResponse();
|
||||||
|
deviceStatusBitmapResponse.setLineId((short) 3);
|
||||||
|
deviceStatusBitmapResponse.setRtuId(null);
|
||||||
|
deviceStatusBitmapResponse.setMsgId(MessageId.DEVICE_STATUS_BITMAP);
|
||||||
|
|
||||||
|
DeviceTypeEntity switchTypeEntity = new DeviceTypeEntity();
|
||||||
|
switchTypeEntity.setType(DeviceType.DEVICE_TYPE_SWITCH);
|
||||||
|
DeviceEntity switchs = new DeviceEntity();
|
||||||
|
switchs.setDevName("W78972");
|
||||||
|
switchs.setStatus(SWITCH.IP_SINGLE_SWITCH_STUS_ATC_INVALID.mask());
|
||||||
|
switchTypeEntity.setDeviceList(List.of(switchs));
|
||||||
|
//站台
|
||||||
|
DeviceEntity platform = new DeviceEntity();
|
||||||
|
platform.setDevName("PF91435");
|
||||||
|
platform.setStatus(PLATFORM.DOWN_HOLD.mask());
|
||||||
|
DeviceTypeEntity platformTypeEntity = new DeviceTypeEntity();
|
||||||
|
platformTypeEntity.setType(DeviceType.DEVICE_TYPE_PLATFORM);
|
||||||
|
platformTypeEntity.setDeviceList(List.of(platform));
|
||||||
|
//信号机
|
||||||
|
DeviceEntity signalform = new DeviceEntity();
|
||||||
|
signalform.setDevName("S44780");
|
||||||
|
signalform.setStatus(SIGNAL.GREEN_OPEN.mask());
|
||||||
|
DeviceTypeEntity signalTypeEntity = new DeviceTypeEntity();
|
||||||
|
signalTypeEntity.setType(DeviceType.DEVICE_TYPE_SIGNAL);
|
||||||
|
signalTypeEntity.setDeviceList(List.of(signalform));
|
||||||
|
|
||||||
|
DeviceEntity station = new DeviceEntity();
|
||||||
|
station.setDevName("Station61238");
|
||||||
|
station.setStatus(STATION.IP_STA_STUS_EXPECT_TERM_MODE3.mask());
|
||||||
|
DeviceTypeEntity stationEntity = new DeviceTypeEntity();
|
||||||
|
stationEntity.setType(DeviceType.DEVICE_TYPE_STATION);
|
||||||
|
stationEntity.setDeviceList(List.of(station));
|
||||||
|
|
||||||
|
deviceStatusBitmapResponse.setEntityList(List.of(switchTypeEntity, platformTypeEntity, signalTypeEntity, stationEntity));
|
||||||
|
MockAppContext.publish(List.of(deviceStatusBitmapResponse));
|
||||||
|
Thread.sleep(10000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private INccMockDataRepository iNccMockDataRepository;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvert() throws Exception {
|
||||||
|
NccMockData mockData = this.iNccMockDataRepository
|
||||||
|
.getOne(Wrappers.<NccMockData>lambdaQuery().eq(NccMockData::getId, 48838L));
|
||||||
|
System.out.println(mockData);
|
||||||
|
DeviceStatusChangeResponse changeResponse = JSONObject.parseObject(mockData.getMockData(), DeviceStatusChangeResponse.class);
|
||||||
|
System.out.println(changeResponse);
|
||||||
|
final DeviceStatusProto.Switch.Builder switchBuild = DeviceStatusConvertor.convert(SWITCH.class, changeResponse.getDeviceStatus());
|
||||||
|
switchBuild.setId(changeResponse.getDevName());
|
||||||
|
DeviceStatusProto.Switch switchProto = switchBuild.build();
|
||||||
|
System.out.println(switchProto.getId() + "--->" + switchProto.getIpSingleSwitchStusBlocked1());
|
||||||
|
// DeviceStatusDataRepository.addDeviceStatusData("3", switchBuild);
|
||||||
|
// System.out.println("aaaaaaaaaaaa");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private NccMockDataService dataService;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void findDeviceTypeEnum() throws Exception {
|
||||||
|
Wrapper<NccMockData> wrapper = Wrappers.<NccMockData>lambdaQuery()
|
||||||
|
.eq(NccMockData::getActionType, ActionTypeEnum.ALL.name())
|
||||||
|
.eq(NccMockData::getMsgType, MsgTypeEnum.REAL_TIME.name());
|
||||||
|
|
||||||
|
List<MessageData> messageData = this.dataService.loadALLData(wrapper);
|
||||||
|
for (MessageData messageDatum : messageData) {
|
||||||
|
List<? extends GeneratedMessageV3.Builder> msgs = messageDatum.generateProto();
|
||||||
|
System.out.println(messageDatum);
|
||||||
|
System.out.println(messageDatum.generateProto());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -7,6 +7,7 @@ import club.joylink.xiannccda.ats.message.line3.MessageCons;
|
|||||||
import club.joylink.xiannccda.ats.message.line3.rep.ActionReportResponse;
|
import club.joylink.xiannccda.ats.message.line3.rep.ActionReportResponse;
|
||||||
import club.joylink.xiannccda.mock.message.MockAppContext;
|
import club.joylink.xiannccda.mock.message.MockAppContext;
|
||||||
import club.joylink.xiannccda.mock.message.NccMockDataService;
|
import club.joylink.xiannccda.mock.message.NccMockDataService;
|
||||||
|
import club.joylink.xiannccda.mock.message.NccMockDataService.DataType;
|
||||||
import club.joylink.xiannccda.protocal.x.TestUtil;
|
import club.joylink.xiannccda.protocal.x.TestUtil;
|
||||||
import club.joylink.xiannccda.repository.INccMockDataRepository;
|
import club.joylink.xiannccda.repository.INccMockDataRepository;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
@ -99,7 +100,7 @@ public class MockMsgTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void findMockData() {
|
public void findMockData() {
|
||||||
List<MessageData> dataList = this.dataService.getMessageData();
|
List<MessageData> dataList = this.dataService.getMessageData(DataType.DEVICE);
|
||||||
System.out.println(dataList);
|
System.out.println(dataList);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,7 +108,7 @@ public class MockMsgTest {
|
|||||||
public void findAll() {
|
public void findAll() {
|
||||||
int i = 0;
|
int i = 0;
|
||||||
while (!this.dataService.finish()) {
|
while (!this.dataService.finish()) {
|
||||||
List<MessageData> dataList = this.dataService.getMessageData();
|
List<MessageData> dataList = this.dataService.getMessageData(DataType.DEVICE);
|
||||||
System.out.println((i++) + "----->" + dataList.size());
|
System.out.println((i++) + "----->" + dataList.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user