代码调整,列车数据获取调整

This commit is contained in:
tiger_zhou 2023-09-07 14:42:50 +08:00
parent 05da62da72
commit b51b9f7bba
7 changed files with 62 additions and 25 deletions

View File

@ -16,6 +16,7 @@ public class OccMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
connection.lastReceiveMessageTime = System.currentTimeMillis();
connection.receiveMessageLatest = System.currentTimeMillis();
List<MessageData> messages = FrameSchema.decode2(in);
if (!(messages == null || messages.size() == 0)) {
// System.out.println(

View File

@ -24,7 +24,9 @@ public class OccMessageHandler extends SimpleChannelInboundHandler<List<MessageD
OccTcpClientConnection connection;
boolean collectorData;
public OccMessageHandler(OccTcpClientConnection connection, boolean collectorData) {
this.connection = connection;
this.collectorData = collectorData;
log.info(" port[{}] ,收集消息数据[{}]", this.connection.port, this.collectorData);

View File

@ -44,25 +44,22 @@ public class OccTcpClientConnection {
*/
HeartBeatTimeoutHandler timeoutHandler;
/**
* 最后一次收到消息时间
* 最后一次收到消息时间,重新连接的时候也会更新此参数
*/
volatile long lastReceiveMessageTime;
/**
* 是否是实时数据连接
* 最后一次接受到消息的时间
*/
private boolean realDataConn;
/**
* 发送请求基础数据
*/
private AtomicBoolean requestBaseData = new AtomicBoolean(false);
long receiveMessageLatest;
public void write(MessageData messageData) {
this.channel.writeAndFlush(List.of(messageData));
}
public OccTcpClientConnection(XianOccMessagingClient client, String host, int port, boolean collectorData, boolean isReal) {
this.realDataConn = isReal;
public OccTcpClientConnection(XianOccMessagingClient client, String host, int port, boolean collectorData) {
this.client = client;
this.host = host;
this.port = port;
@ -83,6 +80,7 @@ public class OccTcpClientConnection {
}
void reconnect() {
if (this.channel != null) {
this.channel.close();
@ -99,12 +97,7 @@ public class OccTcpClientConnection {
this.lastReceiveMessageTime = System.currentTimeMillis();
this.channel = channelFuture.channel();
this.connected = true;
InUsedScheduleData scheduleData = DeviceDataRepository.findDataSouce(String.valueOf(this.client.getLineId()), DataTypeEnum.TRAIN_PLAN);
if (Objects.equals(false, this.realDataConn) && requestBaseData.compareAndSet(false, true) && scheduleData.isEmpty()) {
log.info("发送计划运行图请求 lineId[{}] host[{}] 端口[{}]", this.client.getLineId(), this.host, this.port);
LoadHistoryTGDataRequest dataRequest = new LoadHistoryTGDataRequest((short) this.client.getLineId(), LocalDateTime.now(), ApplyTypeEnum.PLAN_GRAPH);
this.write(dataRequest);
}
this.client.requestBaseData();
}
});
channelFuture.channel().closeFuture().addListener(listener -> {
@ -112,6 +105,7 @@ public class OccTcpClientConnection {
log.warn("与服务断连,尝试重连: {}", this.hostPortInfo());
this.connected = false;
this.channel = null;
this.client.resetRequestBaseDataFlag();
Thread.sleep(3000);
connect();
}

View File

@ -1,5 +1,11 @@
package club.joylink.xiannccda.ats.message;
import club.joylink.xiannccda.ats.message.line3.req.LoadHistoryTGDataRequest;
import club.joylink.xiannccda.ats.message.line3.req.LoadHistoryTGDataRequest.ApplyTypeEnum;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@ -23,14 +29,18 @@ public class XianOccMessagingClient {
* 非实时消息的连接
*/
private final OccTcpClientConnection nrtConnection;
/**
* 是否需要请求基础数据
*/
private AtomicBoolean requestBaseDataFlag = new AtomicBoolean(false);
public void send(MessageData md, boolean isRealTime) {
OccTcpClientConnection conn = isRealTime ? rtConnection : nrtConnection;
if (conn.connected) {
conn.write(md);
} else {
throw new RuntimeException("未连接occ");
String msg = String.format("未连接occ lineId[%s] host[%s] port[%s] messageId[%s]", this.lineId, this.host, conn.port, md.msgId.name());
throw new RuntimeException(msg);
}
}
@ -40,8 +50,29 @@ public class XianOccMessagingClient {
this.lineId = lineId;
// 创建实时和非实时消息连接
this.rtConnection = new OccTcpClientConnection(this, host, realPort, collectorData, true);
this.nrtConnection = new OccTcpClientConnection(this, host, unRealPort, false, false);
this.rtConnection = new OccTcpClientConnection(this, host, realPort, collectorData);
this.nrtConnection = new OccTcpClientConnection(this, host, unRealPort, false);
}
private final static long RESET_REQUEST_BASE_TIME = TimeUnit.HOURS.toMillis(2);
/**
* 重置重新获取基础数据
*/
public void resetRequestBaseDataFlag() {
//实时数据客户端断开并且最新的获取时间 + 2小时小于当前时间
if (Objects.equals(false, this.rtConnection.connected) && (this.rtConnection.receiveMessageLatest + RESET_REQUEST_BASE_TIME) < System.currentTimeMillis()) {
this.requestBaseDataFlag.set(false);
}
}
public void requestBaseData() {
if (this.rtConnection.connected && this.nrtConnection.connected && this.requestBaseDataFlag.compareAndSet(false, true)) {
log.info("发送计划运行图请求 lineId[{}] host[{}] 实时接口[{}]", this.lineId, this.host, false);
LoadHistoryTGDataRequest dataRequest = new LoadHistoryTGDataRequest((short) this.lineId, LocalDateTime.now(), ApplyTypeEnum.PLAN_GRAPH);
this.send(dataRequest, false);
}
}
public ConnectionInfo getRTClientConnectionInfo() {

View File

@ -50,6 +50,14 @@ public class TrainAtpCutAlertMonitoringTask implements AlertMonitoringTask {
}
}
public void updateTrainInfo(TrainInfo.Builder train) {
TrainInfo.Builder saveTrainInfo = this.trainInfoMap.get(train.getGroupId());
if (Objects.nonNull(saveTrainInfo)) {
train.setReceiveTime(saveTrainInfo.getReceiveTime());
this.trainInfoMap.put(train.getGroupId(), train);
}
}
/**
* 判断紧致后发生atp切除并恢复
* <p>

View File

@ -34,6 +34,8 @@ public class TrainModeAlertListener implements AlertSourceEventListener<TrainAle
this.atpCutAlertMonitoringTask.putTrainInfoMonitor(trainInfo);
} else if (Objects.equals(false, trainMode.getIpModeTrainAtpCut())) {
this.atpCutAlertMonitoringTask.recoverAtpCut(trainInfo);
} else {
this.atpCutAlertMonitoringTask.updateTrainInfo(trainInfo);
}
} else if (trainMsgBuild instanceof TrainRemove.Builder trainRemove) {
this.atpCutAlertMonitoringTask.trainRemoveAllInfo(trainRemove);

View File

@ -2,6 +2,8 @@ package club.joylink.xiannccda.device;
import club.joylink.xiannccda.ats.message.convertor.DeviceStatusConvertor;
import club.joylink.xiannccda.ats.message.line3.device.DeviceStatus;
import club.joylink.xiannccda.ats.message.line3.device.DeviceStatus.SWITCH;
import club.joylink.xiannccda.ats.message.line3.device.DeviceStatus.TRACK;
import club.joylink.xiannccda.ats.message.line3.device.DeviceStatus.TRAIN_MODE;
import club.joylink.xiannccda.dto.protos.DeviceStatusProto;
import club.joylink.xiannccda.dto.protos.DeviceStatusProto.Platform;
@ -107,6 +109,8 @@ public class DeviceStatusCheckTest {
@Test
public void sectionTest() {
System.out.println(0x00000002);
System.out.println(TRACK.ATC_INVALID.mask());
System.out.println(SWITCH.IP_SINGLE_SWITCH_STUS_ATC_INVALID.mask());
}
@Test
@ -139,10 +143,5 @@ public class DeviceStatusCheckTest {
System.out.println(lowerToUpper); //{x=A, b=B, c=D}
}
@Test
public void dd() throws UnsupportedEncodingException {
java.lang.String dd = "Q/ohUOUluX5c+sKN62xNrEdSGpixEb//Uv2/AORejJwwQbl6/zF5MjJSpyF6OBLqjInQ24u0DKYMy51/U1As5Pj+YnbD3n7A0nCcqQ1fkhJpt2wdjnaqbYunYamwKctCfLBwoBEjK62TA1ZcON0KH/rb9TWsfBWLJVIlV5PYMXYJXnYbt7KKY8XxOgIT7bZbXeZS/1InIdLYJVzmyCGtnFFg8THLp2MQJzXhiTqerzig7HWR++TypL778+mPaXuDHefKC0cTxz7Zz9LefkvsEB6h3eYBcDj4EcnZzI91VTIPyzfw2JhJ67otWIxDBNS0hfo9m27N3irpIr+eyu6hD/W3KkNJPamzvYOecHgCQKs7vDq8wF/XZm0jWwqQF6XDacf1K+tcL77KsDpPW+HhnUQ/347orSpFGoPpkIUXT27NBzg0V9OZOjbUsdnMXVZgKF4pEBR6FrlDhpjVCgIgn6I3a8ZG6naPZgQd5HIGzVEuYqRHehz8L3e53lpT0mXeEYcdYd/K9D7VJ7CaUZ6KAO1R29+fRNiuj4PPI2+f5UaaeXg85Cq3q5Yi1q4bkd94KkIfEwV1SbtRMRFC/ZVtBeIozq/fPj/j6E2cDenLqlzZFzapDzAz/3mnWe+hcNn4CVbl40ELGZKduXJxUN0ftY59annoMtKwNbDS58VPtXBZ4GsA4gQLgoJKs04L3cml8oEk4j0cuNYPi/KH58rObnlKAtfsFwsOuIOG2k5i5CdnUEvTEmHgvvghJpCOdY2kngmHHvtw+hbCsa1RceeIxdZYLMwFyQLNn+VkrToI6/msEgj1b6F/garriuocN9gN7ZvjC2Mi8ZnvTGaRr5qGb21y13JNWEAhmp0H7IH2LuTL73SPgBB58DmQcRPB7YmwC/HoZB6VUyuIGCYSwGbCNnW6s8/wDGw/KSluO69DtasgX9HGhBtm6qaqEiToJWglM46cPv0zoofFkCyXgbyTerSApcOvi4Onm4CWvnarEvclDdNbGoNvQstYJ8ol/H/hjmVk/Lb87y2ynD58vR4QrA==";
byte[] bgb = Base64.getDecoder().decode(dd);
System.out.println(new String(bgb, "UTF-8"));
}
}