From e3bf6d927d88d0c43056d30cb5306e1899755f32 Mon Sep 17 00:00:00 2001
From: tiger_zhou
Date: Wed, 20 Nov 2024 08:55:03 +0800
Subject: [PATCH] =?UTF-8?q?ats=E6=95=B0=E6=8D=AE=E6=94=B6=E9=9B=86?=
=?UTF-8?q?=E8=B0=83=E6=95=B4?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../ats/message/FrameSchemaParse.java | 42 ++++++--
.../xiannccda/ats/message/MessageData.java | 6 +-
.../xiannccda/ats/message/MessageId.java | 4 +-
.../ats/message/OccMessageDecoder.java | 18 ++--
.../ats/message/OccMessageHandler.java | 3 +-
.../entity/MockAtsCollectorData.java | 44 +++++++++
.../event/MockCollectorMessageDataEvent.java | 18 ++++
.../event/MockCollectorMessageHandle.java | 27 ++++++
.../event/MockCollectorMessageDataEvent.java | 10 --
.../event/MockCollectorMessageHandle.java | 14 ---
.../mock/message/MockAppContext.java | 19 ++--
.../IMockAtsCollectorDataRepository.java | 16 ++++
.../impl/MockAtsCollectorDataRepository.java | 20 ++++
src/main/resources/application-local.yml | 2 +-
.../xiannccda/MybatisPlusGenerator.java | 2 +-
.../occmock/AtsCollectorDataTest.java | 95 +++++++++++++++++++
16 files changed, 278 insertions(+), 62 deletions(-)
create mode 100644 src/main/java/club/joylink/xiannccda/entity/MockAtsCollectorData.java
create mode 100644 src/main/java/club/joylink/xiannccda/event/MockCollectorMessageDataEvent.java
create mode 100644 src/main/java/club/joylink/xiannccda/event/MockCollectorMessageHandle.java
delete mode 100644 src/main/java/club/joylink/xiannccda/mock/collector/event/MockCollectorMessageDataEvent.java
delete mode 100644 src/main/java/club/joylink/xiannccda/mock/collector/event/MockCollectorMessageHandle.java
create mode 100644 src/main/java/club/joylink/xiannccda/repository/IMockAtsCollectorDataRepository.java
create mode 100644 src/main/java/club/joylink/xiannccda/repository/impl/MockAtsCollectorDataRepository.java
create mode 100644 src/test/java/club/joylink/xiannccda/occmock/AtsCollectorDataTest.java
diff --git a/src/main/java/club/joylink/xiannccda/ats/message/FrameSchemaParse.java b/src/main/java/club/joylink/xiannccda/ats/message/FrameSchemaParse.java
index 2c7c0bc..db9ebe7 100644
--- a/src/main/java/club/joylink/xiannccda/ats/message/FrameSchemaParse.java
+++ b/src/main/java/club/joylink/xiannccda/ats/message/FrameSchemaParse.java
@@ -8,6 +8,9 @@ import io.netty.buffer.Unpooled;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -19,6 +22,8 @@ public class FrameSchemaParse {
* key = 端口号(表明是实时还是非实时)
*/
private static final Map PACKAGE_DATA_MAP = new ConcurrentHashMap<>();
+ private static final Map SOURCE_PACKAGE_DATA_MAP = new ConcurrentHashMap<>();
+
/*private static final ByteBuf INPUT_DATA_BUF = Unpooled.buffer();
public static List decode2(ByteBuf inByteBuf, Integer port) {
@@ -38,18 +43,27 @@ public class FrameSchemaParse {
return null;
}*/
- public static List decode(ByteBuf inByteBuf, Integer port) {
+
+ public static DecodeResult decode(ByteBuf inByteBuf, Integer port) {
ByteBuf packageData = PACKAGE_DATA_MAP.computeIfAbsent(port, (d) -> Unpooled.buffer());
- boolean readComplate = schemaParse(inByteBuf, packageData);
+ ByteBuf sourcePackageData = SOURCE_PACKAGE_DATA_MAP.computeIfAbsent(port, (d) -> Unpooled.buffer());
+ boolean readComplate = schemaParse(inByteBuf, packageData, sourcePackageData);
if (readComplate) {
try {
+ DecodeResult dr = new DecodeResult();
List messageDataList = Lists.newArrayList();
messageDecode(packageData, messageDataList);
- return messageDataList;
+ dr.data = messageDataList;
+ byte[] data = new byte[sourcePackageData.readableBytes()];
+ sourcePackageData.readBytes(data);
+ dr.sourceData = data;
+
+ return dr;
} catch (Exception e) {
log.error("OCC消息解析异常", e);
return null;
} finally {
+ sourcePackageData.clear();
packageData.clear();
}
}
@@ -82,7 +96,7 @@ public class FrameSchemaParse {
}
}
- private static boolean schemaParse(ByteBuf inByteBuf, ByteBuf packageData) {
+ private static boolean schemaParse(ByteBuf inByteBuf, ByteBuf packageData, ByteBuf sourcePackageData) {
inByteBuf.markReaderIndex();
int totalReadables = inByteBuf.readableBytes();
if (totalReadables < 4) {
@@ -100,9 +114,23 @@ public class FrameSchemaParse {
log.error("可读内容不足 sysId[{}] totalLen[{}] flag[{}] contentLen[{}] readableBytes[{}]", sysId, totalLength, flag, contentLen, readables);
return false;
}
-// byte[] bb = new byte[contentLen];
-// inByteBuf.readBytes(bb);
- packageData.writeBytes(inByteBuf, contentLen);
+ byte[] bb = new byte[contentLen];
+ inByteBuf.readBytes(bb);
+ packageData.writeBytes(bb);
+// packageData.writeBytes(inByteBuf, contentLen);
+ sourcePackageData.writeByte(sysId);
+ sourcePackageData.writeShort(totalLength);
+ sourcePackageData.writeByte(flag);
+ sourcePackageData.writeBytes(bb);
return flag != Flag_Multi;
}
+
+ @Getter
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class DecodeResult {
+
+ private List data;
+ private byte[] sourceData;
+ }
}
diff --git a/src/main/java/club/joylink/xiannccda/ats/message/MessageData.java b/src/main/java/club/joylink/xiannccda/ats/message/MessageData.java
index 6ebb7c6..d83f24f 100644
--- a/src/main/java/club/joylink/xiannccda/ats/message/MessageData.java
+++ b/src/main/java/club/joylink/xiannccda/ats/message/MessageData.java
@@ -66,6 +66,7 @@ public abstract class MessageData {
}
+
public void decode(ByteBuf buf) throws Exception {
final int headerBytes = 10;
int readableBytes = buf.readableBytes();
@@ -122,9 +123,12 @@ public abstract class MessageData {
private Long id;
private List messages;
- public DecodeMessageData(Long id, List messages) {
+ private byte[] sourceData;
+
+ public DecodeMessageData(Long id, List messages, byte[] sourceData) {
this.id = id;
this.messages = messages;
+ this.sourceData = sourceData;
}
}
}
diff --git a/src/main/java/club/joylink/xiannccda/ats/message/MessageId.java b/src/main/java/club/joylink/xiannccda/ats/message/MessageId.java
index 54b9f1b..29d0742 100644
--- a/src/main/java/club/joylink/xiannccda/ats/message/MessageId.java
+++ b/src/main/java/club/joylink/xiannccda/ats/message/MessageId.java
@@ -178,9 +178,9 @@ public enum MessageId {
/**
* 消息对象创建接口
*/
- OccMessageCreate omc;
+ public OccMessageCreate omc;
- interface OccMessageCreate {
+ public interface OccMessageCreate {
MessageData create();
}
diff --git a/src/main/java/club/joylink/xiannccda/ats/message/OccMessageDecoder.java b/src/main/java/club/joylink/xiannccda/ats/message/OccMessageDecoder.java
index 00a6c7e..174a90f 100644
--- a/src/main/java/club/joylink/xiannccda/ats/message/OccMessageDecoder.java
+++ b/src/main/java/club/joylink/xiannccda/ats/message/OccMessageDecoder.java
@@ -1,11 +1,14 @@
package club.joylink.xiannccda.ats.message;
+import club.joylink.xiannccda.ats.message.FrameSchemaParse.DecodeResult;
import club.joylink.xiannccda.ats.message.MessageData.DecodeMessageData;
import cn.hutool.core.util.IdUtil;
+import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
+import java.util.Objects;
import lombok.Getter;
public class OccMessageDecoder extends ByteToMessageDecoder {
@@ -26,18 +29,9 @@ public class OccMessageDecoder extends ByteToMessageDecoder {
connection.lastReceiveMessageTime = System.currentTimeMillis();
connection.receiveMessageLatest = System.currentTimeMillis();
Long id = IdUtil.getSnowflake().nextId();
-
-// List messages = FrameSchema.decode2(in);
- List messages = FrameSchemaParse.decode(in, this.connection.port);
- if (!(messages == null || messages.size() == 0)) {
-// System.out.println(
-// String.format(
-// "收到OCC消息: %s 来自 port: %s", this.connection.port,
-// JSON.toJSONString(
-// messages.get(0), Feature.PrettyFormat, Feature.FieldBased, Feature.WriteNulls)));
-
-// out.add(messages);
- out.add(List.of(new DecodeMessageData(id, messages)));
+ DecodeResult dr = FrameSchemaParse.decode(in, this.connection.port);
+ if (Objects.nonNull(dr) && CollectionUtils.isNotEmpty(dr.getData())) {
+ out.add(List.of(new DecodeMessageData(id, dr.getData(), dr.getSourceData())));
}
}
diff --git a/src/main/java/club/joylink/xiannccda/ats/message/OccMessageHandler.java b/src/main/java/club/joylink/xiannccda/ats/message/OccMessageHandler.java
index 5fca6ad..1d02ab1 100644
--- a/src/main/java/club/joylink/xiannccda/ats/message/OccMessageHandler.java
+++ b/src/main/java/club/joylink/xiannccda/ats/message/OccMessageHandler.java
@@ -67,8 +67,7 @@ public class OccMessageHandler extends SimpleChannelInboundHandler
+ *
+ *
+ *
+ * @author walker-sheng
+ * @since 2024-11-19
+ */
+@Getter
+@Setter
+@Accessors(chain = true)
+@TableName("mock_ats_collector_data")
+@Schema(name = "MockAtsCollectorData")
+public class MockAtsCollectorData {
+
+ @TableId(value = "id")
+ private Long id;
+
+ @Schema(description = "线路id")
+ private Integer lineId;
+ @Schema(description = "ats原始数据")
+ private byte[] sourceDate;
+
+ private LocalDateTime createTime;
+
+ public static final String ID = "id";
+
+ public static final String LINE_ID = "line_id";
+
+ public static final String SOURCE_DATE = "source_date";
+
+ public static final String CREATE_TIME = "create_time";
+}
diff --git a/src/main/java/club/joylink/xiannccda/event/MockCollectorMessageDataEvent.java b/src/main/java/club/joylink/xiannccda/event/MockCollectorMessageDataEvent.java
new file mode 100644
index 0000000..ff63932
--- /dev/null
+++ b/src/main/java/club/joylink/xiannccda/event/MockCollectorMessageDataEvent.java
@@ -0,0 +1,18 @@
+package club.joylink.xiannccda.event;
+
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+public class MockCollectorMessageDataEvent extends ApplicationEvent {
+
+ @Getter
+ private Integer lineId;
+ @Getter
+ private Long tableId;
+
+ public MockCollectorMessageDataEvent(Object source, Integer lineId, Long tableId) {
+ super(source);
+ this.lineId = lineId;
+ this.tableId = tableId;
+ }
+}
diff --git a/src/main/java/club/joylink/xiannccda/event/MockCollectorMessageHandle.java b/src/main/java/club/joylink/xiannccda/event/MockCollectorMessageHandle.java
new file mode 100644
index 0000000..b9a4539
--- /dev/null
+++ b/src/main/java/club/joylink/xiannccda/event/MockCollectorMessageHandle.java
@@ -0,0 +1,27 @@
+package club.joylink.xiannccda.event;
+
+import club.joylink.xiannccda.entity.MockAtsCollectorData;
+import club.joylink.xiannccda.repository.IMockAtsCollectorDataRepository;
+import jakarta.annotation.Resource;
+import java.time.LocalDateTime;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MockCollectorMessageHandle {
+
+ @Resource
+ private IMockAtsCollectorDataRepository mockAtsCollectorDataRepository;
+
+ @Async("nsExecutor")
+ @EventListener
+ public void mockData(MockCollectorMessageDataEvent event) {
+ MockAtsCollectorData collectorData = new MockAtsCollectorData();
+ collectorData.setId(event.getTableId());
+ collectorData.setLineId(event.getLineId());
+ collectorData.setSourceDate((byte[]) event.getSource());
+ collectorData.setCreateTime(LocalDateTime.now());
+ this.mockAtsCollectorDataRepository.save(collectorData);
+ }
+}
diff --git a/src/main/java/club/joylink/xiannccda/mock/collector/event/MockCollectorMessageDataEvent.java b/src/main/java/club/joylink/xiannccda/mock/collector/event/MockCollectorMessageDataEvent.java
deleted file mode 100644
index 062fb39..0000000
--- a/src/main/java/club/joylink/xiannccda/mock/collector/event/MockCollectorMessageDataEvent.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package club.joylink.xiannccda.mock.collector.event;
-
-import org.springframework.context.ApplicationEvent;
-
-public class MockCollectorMessageDataEvent extends ApplicationEvent {
-
- public MockCollectorMessageDataEvent(Object source) {
- super(source);
- }
-}
diff --git a/src/main/java/club/joylink/xiannccda/mock/collector/event/MockCollectorMessageHandle.java b/src/main/java/club/joylink/xiannccda/mock/collector/event/MockCollectorMessageHandle.java
deleted file mode 100644
index 9c4779f..0000000
--- a/src/main/java/club/joylink/xiannccda/mock/collector/event/MockCollectorMessageHandle.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package club.joylink.xiannccda.mock.collector.event;
-
-import club.joylink.xiannccda.mock.message.event.MockMessageDataEvent;
-import org.springframework.context.event.EventListener;
-import org.springframework.scheduling.annotation.Async;
-
-public class MockCollectorMessageHandle {
-
- @Async("nsExecutor")
- @EventListener
- public void mockData(MockMessageDataEvent event) {
-
- }
-}
diff --git a/src/main/java/club/joylink/xiannccda/mock/message/MockAppContext.java b/src/main/java/club/joylink/xiannccda/mock/message/MockAppContext.java
index ccc467c..0331c2a 100644
--- a/src/main/java/club/joylink/xiannccda/mock/message/MockAppContext.java
+++ b/src/main/java/club/joylink/xiannccda/mock/message/MockAppContext.java
@@ -3,27 +3,22 @@ package club.joylink.xiannccda.mock.message;
import club.joylink.xiannccda.ats.message.MessageData;
import club.joylink.xiannccda.ats.message.MessageData.DecodeMessageData;
import club.joylink.xiannccda.ats.message.MessageId;
+import club.joylink.xiannccda.ats.message.MessageResponse;
+import club.joylink.xiannccda.ats.message.line3.HeartBeatMsg;
import club.joylink.xiannccda.constants.SystemContext;
-import club.joylink.xiannccda.mock.collector.event.MockCollectorMessageDataEvent;
+import club.joylink.xiannccda.event.MockCollectorMessageDataEvent;
import club.joylink.xiannccda.mock.message.event.MockMessageDataEvent;
-import com.alibaba.fastjson2.JSON;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
-import org.springframework.beans.BeansException;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-import org.springframework.stereotype.Component;
public class MockAppContext {
public static void publishCollectorAtsData(DecodeMessageData nessageData) {
-// List newList = datas.stream().filter(d -> d.getMsgId() != MessageId.MESSAGE_POLLING).toList();
- SystemContext.publishEvent(new MockCollectorMessageDataEvent(nessageData));
+ MessageData findMsg = nessageData.getMessages().stream().filter(d -> !(d instanceof HeartBeatMsg)).findFirst().orElseGet(null);
+ if (Objects.nonNull(findMsg) && findMsg instanceof MessageResponse mr) {
+ SystemContext.publishEvent(new MockCollectorMessageDataEvent(nessageData.getSourceData(), mr.getLineId().intValue(), nessageData.getId()));
+ }
}
public static void publish(List datas) {
diff --git a/src/main/java/club/joylink/xiannccda/repository/IMockAtsCollectorDataRepository.java b/src/main/java/club/joylink/xiannccda/repository/IMockAtsCollectorDataRepository.java
new file mode 100644
index 0000000..95addb9
--- /dev/null
+++ b/src/main/java/club/joylink/xiannccda/repository/IMockAtsCollectorDataRepository.java
@@ -0,0 +1,16 @@
+package club.joylink.xiannccda.repository;
+
+import club.joylink.xiannccda.entity.MockAtsCollectorData;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/**
+ *
+ * 服务类
+ *
+ *
+ * @author walker-sheng
+ * @since 2024-11-19
+ */
+public interface IMockAtsCollectorDataRepository extends IService {
+
+}
diff --git a/src/main/java/club/joylink/xiannccda/repository/impl/MockAtsCollectorDataRepository.java b/src/main/java/club/joylink/xiannccda/repository/impl/MockAtsCollectorDataRepository.java
new file mode 100644
index 0000000..1b533bc
--- /dev/null
+++ b/src/main/java/club/joylink/xiannccda/repository/impl/MockAtsCollectorDataRepository.java
@@ -0,0 +1,20 @@
+package club.joylink.xiannccda.repository.impl;
+
+import club.joylink.xiannccda.entity.MockAtsCollectorData;
+import club.joylink.xiannccda.mapper.MockAtsCollectorDataMapper;
+import club.joylink.xiannccda.repository.IMockAtsCollectorDataRepository;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.springframework.stereotype.Service;
+
+/**
+ *
+ * 服务实现类
+ *
+ *
+ * @author walker-sheng
+ * @since 2024-11-19
+ */
+@Service
+public class MockAtsCollectorDataRepository extends ServiceImpl implements IMockAtsCollectorDataRepository {
+
+}
diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml
index fa9eafd..9c4187d 100644
--- a/src/main/resources/application-local.yml
+++ b/src/main/resources/application-local.yml
@@ -21,7 +21,7 @@ occ-client:
server-host: 127.0.0.1
real-port: 2603
un-real-port: 5703
- collector-data: false
+ collector-data: true
monitor-handware-change: false
receiveMsgTimeout: 6
connectionOcc: true
diff --git a/src/test/java/club/joylink/xiannccda/MybatisPlusGenerator.java b/src/test/java/club/joylink/xiannccda/MybatisPlusGenerator.java
index 8e35cd6..13d0c52 100644
--- a/src/test/java/club/joylink/xiannccda/MybatisPlusGenerator.java
+++ b/src/test/java/club/joylink/xiannccda/MybatisPlusGenerator.java
@@ -29,7 +29,7 @@ public class MybatisPlusGenerator {
private static final List includeTableList = new ArrayList<>();
static {
- includeTableList.add("fault_query");
+ includeTableList.add("mock_ats_collector_data");
// includeTableList.add("relieve_time_shere");
}
diff --git a/src/test/java/club/joylink/xiannccda/occmock/AtsCollectorDataTest.java b/src/test/java/club/joylink/xiannccda/occmock/AtsCollectorDataTest.java
new file mode 100644
index 0000000..127638e
--- /dev/null
+++ b/src/test/java/club/joylink/xiannccda/occmock/AtsCollectorDataTest.java
@@ -0,0 +1,95 @@
+package club.joylink.xiannccda.occmock;
+
+
+import club.joylink.xiannccda.ats.message.FrameSchemaParse;
+import club.joylink.xiannccda.ats.message.FrameSchemaParse.DecodeResult;
+import club.joylink.xiannccda.ats.message.MessageData;
+import club.joylink.xiannccda.ats.message.MessageId;
+import club.joylink.xiannccda.entity.MockAtsCollectorData;
+import club.joylink.xiannccda.repository.IMockAtsCollectorDataRepository;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import jakarta.annotation.Resource;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+public class AtsCollectorDataTest {
+
+ static final short Flag_Multi = 1;
+ @Resource
+ private IMockAtsCollectorDataRepository mockAtsCollectorDataRepository;
+
+ @Test
+ public void loadEncodeTest() throws Exception {
+ List collDataList = this.mockAtsCollectorDataRepository.list();
+ for (MockAtsCollectorData cd : collDataList) {
+ ByteBuf in = Unpooled.wrappedBuffer(cd.getSourceDate());
+ ByteBuf out = Unpooled.buffer();
+
+ if (schemaParse(in, out)) {
+ List messages = new ArrayList();
+ messageDecode(out, messages);
+ for (MessageData message : messages) {
+ System.out.println(message);
+ }
+
+ }
+ }
+ }
+
+ private static boolean schemaParse(ByteBuf inByteBuf, ByteBuf packageData) {
+ inByteBuf.markReaderIndex();
+ int totalReadables = inByteBuf.readableBytes();
+ if (totalReadables < 4) {
+// inByteBuf.resetReaderIndex();
+
+ return false;
+ }
+ short sysId = inByteBuf.readUnsignedByte();
+ int totalLength = inByteBuf.readUnsignedShort();
+ short flag = inByteBuf.readUnsignedByte();
+ int contentLen = totalLength - 1;
+ int readables = inByteBuf.readableBytes();
+ if (readables < contentLen) {
+ inByteBuf.resetReaderIndex();
+
+ return false;
+ }
+ byte[] bb = new byte[contentLen];
+ inByteBuf.readBytes(bb);
+ packageData.writeBytes(bb);
+
+ if (flag == Flag_Multi) {
+ schemaParse(inByteBuf, packageData);
+ }
+ return true;
+ }
+
+ private static void messageDecode(ByteBuf buf, List messages) throws Exception {
+ int len = buf.getUnsignedShort(0);
+ byte[] msgData = new byte[len + 2];
+ buf.readBytes(msgData);
+ ByteBuf msgBuf = Unpooled.wrappedBuffer(msgData);
+
+ int msgId = msgBuf.getUnsignedShort(8);
+ MessageId messageId = MessageId.of(msgId);
+ if (messageId.equals(MessageId.UNKNOWN)) {
+ throw new Exception("未知的消息id");
+ }
+ if (messageId.omc == null || messageId.omc.create() == null) {
+ throw new Exception(
+ String.format("id=%s的消息没有消息对象创建接口omc或接口返回null", messageId));
+ }
+ MessageData message = messageId.omc.create();
+ message.setMsgId(messageId);
+ message.decode(msgBuf);
+ messages.add(message);
+
+ if (buf.readableBytes() > 0) {
+ messageDecode(buf, messages);
+ }
+ }
+}