diff --git a/src/main/java/club/joylink/xiannccda/ats/message/FrameSchemaParse.java b/src/main/java/club/joylink/xiannccda/ats/message/FrameSchemaParse.java index 2c7c0bc..db9ebe7 100644 --- a/src/main/java/club/joylink/xiannccda/ats/message/FrameSchemaParse.java +++ b/src/main/java/club/joylink/xiannccda/ats/message/FrameSchemaParse.java @@ -8,6 +8,9 @@ import io.netty.buffer.Unpooled; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -19,6 +22,8 @@ public class FrameSchemaParse { * key = 端口号(表明是实时还是非实时) */ private static final Map PACKAGE_DATA_MAP = new ConcurrentHashMap<>(); + private static final Map SOURCE_PACKAGE_DATA_MAP = new ConcurrentHashMap<>(); + /*private static final ByteBuf INPUT_DATA_BUF = Unpooled.buffer(); public static List decode2(ByteBuf inByteBuf, Integer port) { @@ -38,18 +43,27 @@ public class FrameSchemaParse { return null; }*/ - public static List decode(ByteBuf inByteBuf, Integer port) { + + public static DecodeResult decode(ByteBuf inByteBuf, Integer port) { ByteBuf packageData = PACKAGE_DATA_MAP.computeIfAbsent(port, (d) -> Unpooled.buffer()); - boolean readComplate = schemaParse(inByteBuf, packageData); + ByteBuf sourcePackageData = SOURCE_PACKAGE_DATA_MAP.computeIfAbsent(port, (d) -> Unpooled.buffer()); + boolean readComplate = schemaParse(inByteBuf, packageData, sourcePackageData); if (readComplate) { try { + DecodeResult dr = new DecodeResult(); List messageDataList = Lists.newArrayList(); messageDecode(packageData, messageDataList); - return messageDataList; + dr.data = messageDataList; + byte[] data = new byte[sourcePackageData.readableBytes()]; + sourcePackageData.readBytes(data); + dr.sourceData = data; + + return dr; } catch (Exception e) { log.error("OCC消息解析异常", e); return null; } finally { + sourcePackageData.clear(); packageData.clear(); } } @@ -82,7 +96,7 @@ public class FrameSchemaParse { } } - private static boolean schemaParse(ByteBuf inByteBuf, ByteBuf packageData) { + private static boolean schemaParse(ByteBuf inByteBuf, ByteBuf packageData, ByteBuf sourcePackageData) { inByteBuf.markReaderIndex(); int totalReadables = inByteBuf.readableBytes(); if (totalReadables < 4) { @@ -100,9 +114,23 @@ public class FrameSchemaParse { log.error("可读内容不足 sysId[{}] totalLen[{}] flag[{}] contentLen[{}] readableBytes[{}]", sysId, totalLength, flag, contentLen, readables); return false; } -// byte[] bb = new byte[contentLen]; -// inByteBuf.readBytes(bb); - packageData.writeBytes(inByteBuf, contentLen); + byte[] bb = new byte[contentLen]; + inByteBuf.readBytes(bb); + packageData.writeBytes(bb); +// packageData.writeBytes(inByteBuf, contentLen); + sourcePackageData.writeByte(sysId); + sourcePackageData.writeShort(totalLength); + sourcePackageData.writeByte(flag); + sourcePackageData.writeBytes(bb); return flag != Flag_Multi; } + + @Getter + @AllArgsConstructor + @NoArgsConstructor + public static class DecodeResult { + + private List data; + private byte[] sourceData; + } } diff --git a/src/main/java/club/joylink/xiannccda/ats/message/MessageData.java b/src/main/java/club/joylink/xiannccda/ats/message/MessageData.java index 6ebb7c6..d83f24f 100644 --- a/src/main/java/club/joylink/xiannccda/ats/message/MessageData.java +++ b/src/main/java/club/joylink/xiannccda/ats/message/MessageData.java @@ -66,6 +66,7 @@ public abstract class MessageData { } + public void decode(ByteBuf buf) throws Exception { final int headerBytes = 10; int readableBytes = buf.readableBytes(); @@ -122,9 +123,12 @@ public abstract class MessageData { private Long id; private List messages; - public DecodeMessageData(Long id, List messages) { + private byte[] sourceData; + + public DecodeMessageData(Long id, List messages, byte[] sourceData) { this.id = id; this.messages = messages; + this.sourceData = sourceData; } } } diff --git a/src/main/java/club/joylink/xiannccda/ats/message/MessageId.java b/src/main/java/club/joylink/xiannccda/ats/message/MessageId.java index 54b9f1b..29d0742 100644 --- a/src/main/java/club/joylink/xiannccda/ats/message/MessageId.java +++ b/src/main/java/club/joylink/xiannccda/ats/message/MessageId.java @@ -178,9 +178,9 @@ public enum MessageId { /** * 消息对象创建接口 */ - OccMessageCreate omc; + public OccMessageCreate omc; - interface OccMessageCreate { + public interface OccMessageCreate { MessageData create(); } diff --git a/src/main/java/club/joylink/xiannccda/ats/message/OccMessageDecoder.java b/src/main/java/club/joylink/xiannccda/ats/message/OccMessageDecoder.java index 00a6c7e..174a90f 100644 --- a/src/main/java/club/joylink/xiannccda/ats/message/OccMessageDecoder.java +++ b/src/main/java/club/joylink/xiannccda/ats/message/OccMessageDecoder.java @@ -1,11 +1,14 @@ package club.joylink.xiannccda.ats.message; +import club.joylink.xiannccda.ats.message.FrameSchemaParse.DecodeResult; import club.joylink.xiannccda.ats.message.MessageData.DecodeMessageData; import cn.hutool.core.util.IdUtil; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; +import java.util.Objects; import lombok.Getter; public class OccMessageDecoder extends ByteToMessageDecoder { @@ -26,18 +29,9 @@ public class OccMessageDecoder extends ByteToMessageDecoder { connection.lastReceiveMessageTime = System.currentTimeMillis(); connection.receiveMessageLatest = System.currentTimeMillis(); Long id = IdUtil.getSnowflake().nextId(); - -// List messages = FrameSchema.decode2(in); - List messages = FrameSchemaParse.decode(in, this.connection.port); - if (!(messages == null || messages.size() == 0)) { -// System.out.println( -// String.format( -// "收到OCC消息: %s 来自 port: %s", this.connection.port, -// JSON.toJSONString( -// messages.get(0), Feature.PrettyFormat, Feature.FieldBased, Feature.WriteNulls))); - -// out.add(messages); - out.add(List.of(new DecodeMessageData(id, messages))); + DecodeResult dr = FrameSchemaParse.decode(in, this.connection.port); + if (Objects.nonNull(dr) && CollectionUtils.isNotEmpty(dr.getData())) { + out.add(List.of(new DecodeMessageData(id, dr.getData(), dr.getSourceData()))); } } diff --git a/src/main/java/club/joylink/xiannccda/ats/message/OccMessageHandler.java b/src/main/java/club/joylink/xiannccda/ats/message/OccMessageHandler.java index 5fca6ad..1d02ab1 100644 --- a/src/main/java/club/joylink/xiannccda/ats/message/OccMessageHandler.java +++ b/src/main/java/club/joylink/xiannccda/ats/message/OccMessageHandler.java @@ -67,8 +67,7 @@ public class OccMessageHandler extends SimpleChannelInboundHandler + * + *

+ * + * @author walker-sheng + * @since 2024-11-19 + */ +@Getter +@Setter +@Accessors(chain = true) +@TableName("mock_ats_collector_data") +@Schema(name = "MockAtsCollectorData") +public class MockAtsCollectorData { + + @TableId(value = "id") + private Long id; + + @Schema(description = "线路id") + private Integer lineId; + @Schema(description = "ats原始数据") + private byte[] sourceDate; + + private LocalDateTime createTime; + + public static final String ID = "id"; + + public static final String LINE_ID = "line_id"; + + public static final String SOURCE_DATE = "source_date"; + + public static final String CREATE_TIME = "create_time"; +} diff --git a/src/main/java/club/joylink/xiannccda/event/MockCollectorMessageDataEvent.java b/src/main/java/club/joylink/xiannccda/event/MockCollectorMessageDataEvent.java new file mode 100644 index 0000000..ff63932 --- /dev/null +++ b/src/main/java/club/joylink/xiannccda/event/MockCollectorMessageDataEvent.java @@ -0,0 +1,18 @@ +package club.joylink.xiannccda.event; + +import lombok.Getter; +import org.springframework.context.ApplicationEvent; + +public class MockCollectorMessageDataEvent extends ApplicationEvent { + + @Getter + private Integer lineId; + @Getter + private Long tableId; + + public MockCollectorMessageDataEvent(Object source, Integer lineId, Long tableId) { + super(source); + this.lineId = lineId; + this.tableId = tableId; + } +} diff --git a/src/main/java/club/joylink/xiannccda/event/MockCollectorMessageHandle.java b/src/main/java/club/joylink/xiannccda/event/MockCollectorMessageHandle.java new file mode 100644 index 0000000..b9a4539 --- /dev/null +++ b/src/main/java/club/joylink/xiannccda/event/MockCollectorMessageHandle.java @@ -0,0 +1,27 @@ +package club.joylink.xiannccda.event; + +import club.joylink.xiannccda.entity.MockAtsCollectorData; +import club.joylink.xiannccda.repository.IMockAtsCollectorDataRepository; +import jakarta.annotation.Resource; +import java.time.LocalDateTime; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +@Component +public class MockCollectorMessageHandle { + + @Resource + private IMockAtsCollectorDataRepository mockAtsCollectorDataRepository; + + @Async("nsExecutor") + @EventListener + public void mockData(MockCollectorMessageDataEvent event) { + MockAtsCollectorData collectorData = new MockAtsCollectorData(); + collectorData.setId(event.getTableId()); + collectorData.setLineId(event.getLineId()); + collectorData.setSourceDate((byte[]) event.getSource()); + collectorData.setCreateTime(LocalDateTime.now()); + this.mockAtsCollectorDataRepository.save(collectorData); + } +} diff --git a/src/main/java/club/joylink/xiannccda/mock/collector/event/MockCollectorMessageDataEvent.java b/src/main/java/club/joylink/xiannccda/mock/collector/event/MockCollectorMessageDataEvent.java deleted file mode 100644 index 062fb39..0000000 --- a/src/main/java/club/joylink/xiannccda/mock/collector/event/MockCollectorMessageDataEvent.java +++ /dev/null @@ -1,10 +0,0 @@ -package club.joylink.xiannccda.mock.collector.event; - -import org.springframework.context.ApplicationEvent; - -public class MockCollectorMessageDataEvent extends ApplicationEvent { - - public MockCollectorMessageDataEvent(Object source) { - super(source); - } -} diff --git a/src/main/java/club/joylink/xiannccda/mock/collector/event/MockCollectorMessageHandle.java b/src/main/java/club/joylink/xiannccda/mock/collector/event/MockCollectorMessageHandle.java deleted file mode 100644 index 9c4779f..0000000 --- a/src/main/java/club/joylink/xiannccda/mock/collector/event/MockCollectorMessageHandle.java +++ /dev/null @@ -1,14 +0,0 @@ -package club.joylink.xiannccda.mock.collector.event; - -import club.joylink.xiannccda.mock.message.event.MockMessageDataEvent; -import org.springframework.context.event.EventListener; -import org.springframework.scheduling.annotation.Async; - -public class MockCollectorMessageHandle { - - @Async("nsExecutor") - @EventListener - public void mockData(MockMessageDataEvent event) { - - } -} diff --git a/src/main/java/club/joylink/xiannccda/mock/message/MockAppContext.java b/src/main/java/club/joylink/xiannccda/mock/message/MockAppContext.java index ccc467c..0331c2a 100644 --- a/src/main/java/club/joylink/xiannccda/mock/message/MockAppContext.java +++ b/src/main/java/club/joylink/xiannccda/mock/message/MockAppContext.java @@ -3,27 +3,22 @@ package club.joylink.xiannccda.mock.message; import club.joylink.xiannccda.ats.message.MessageData; import club.joylink.xiannccda.ats.message.MessageData.DecodeMessageData; import club.joylink.xiannccda.ats.message.MessageId; +import club.joylink.xiannccda.ats.message.MessageResponse; +import club.joylink.xiannccda.ats.message.line3.HeartBeatMsg; import club.joylink.xiannccda.constants.SystemContext; -import club.joylink.xiannccda.mock.collector.event.MockCollectorMessageDataEvent; +import club.joylink.xiannccda.event.MockCollectorMessageDataEvent; import club.joylink.xiannccda.mock.message.event.MockMessageDataEvent; -import com.alibaba.fastjson2.JSON; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.time.LocalDateTime; import java.util.List; import java.util.Objects; -import org.springframework.beans.BeansException; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.stereotype.Component; public class MockAppContext { public static void publishCollectorAtsData(DecodeMessageData nessageData) { -// List newList = datas.stream().filter(d -> d.getMsgId() != MessageId.MESSAGE_POLLING).toList(); - SystemContext.publishEvent(new MockCollectorMessageDataEvent(nessageData)); + MessageData findMsg = nessageData.getMessages().stream().filter(d -> !(d instanceof HeartBeatMsg)).findFirst().orElseGet(null); + if (Objects.nonNull(findMsg) && findMsg instanceof MessageResponse mr) { + SystemContext.publishEvent(new MockCollectorMessageDataEvent(nessageData.getSourceData(), mr.getLineId().intValue(), nessageData.getId())); + } } public static void publish(List datas) { diff --git a/src/main/java/club/joylink/xiannccda/repository/IMockAtsCollectorDataRepository.java b/src/main/java/club/joylink/xiannccda/repository/IMockAtsCollectorDataRepository.java new file mode 100644 index 0000000..95addb9 --- /dev/null +++ b/src/main/java/club/joylink/xiannccda/repository/IMockAtsCollectorDataRepository.java @@ -0,0 +1,16 @@ +package club.joylink.xiannccda.repository; + +import club.joylink.xiannccda.entity.MockAtsCollectorData; +import com.baomidou.mybatisplus.extension.service.IService; + +/** + *

+ * 服务类 + *

+ * + * @author walker-sheng + * @since 2024-11-19 + */ +public interface IMockAtsCollectorDataRepository extends IService { + +} diff --git a/src/main/java/club/joylink/xiannccda/repository/impl/MockAtsCollectorDataRepository.java b/src/main/java/club/joylink/xiannccda/repository/impl/MockAtsCollectorDataRepository.java new file mode 100644 index 0000000..1b533bc --- /dev/null +++ b/src/main/java/club/joylink/xiannccda/repository/impl/MockAtsCollectorDataRepository.java @@ -0,0 +1,20 @@ +package club.joylink.xiannccda.repository.impl; + +import club.joylink.xiannccda.entity.MockAtsCollectorData; +import club.joylink.xiannccda.mapper.MockAtsCollectorDataMapper; +import club.joylink.xiannccda.repository.IMockAtsCollectorDataRepository; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import org.springframework.stereotype.Service; + +/** + *

+ * 服务实现类 + *

+ * + * @author walker-sheng + * @since 2024-11-19 + */ +@Service +public class MockAtsCollectorDataRepository extends ServiceImpl implements IMockAtsCollectorDataRepository { + +} diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml index fa9eafd..9c4187d 100644 --- a/src/main/resources/application-local.yml +++ b/src/main/resources/application-local.yml @@ -21,7 +21,7 @@ occ-client: server-host: 127.0.0.1 real-port: 2603 un-real-port: 5703 - collector-data: false + collector-data: true monitor-handware-change: false receiveMsgTimeout: 6 connectionOcc: true diff --git a/src/test/java/club/joylink/xiannccda/MybatisPlusGenerator.java b/src/test/java/club/joylink/xiannccda/MybatisPlusGenerator.java index 8e35cd6..13d0c52 100644 --- a/src/test/java/club/joylink/xiannccda/MybatisPlusGenerator.java +++ b/src/test/java/club/joylink/xiannccda/MybatisPlusGenerator.java @@ -29,7 +29,7 @@ public class MybatisPlusGenerator { private static final List includeTableList = new ArrayList<>(); static { - includeTableList.add("fault_query"); + includeTableList.add("mock_ats_collector_data"); // includeTableList.add("relieve_time_shere"); } diff --git a/src/test/java/club/joylink/xiannccda/occmock/AtsCollectorDataTest.java b/src/test/java/club/joylink/xiannccda/occmock/AtsCollectorDataTest.java new file mode 100644 index 0000000..127638e --- /dev/null +++ b/src/test/java/club/joylink/xiannccda/occmock/AtsCollectorDataTest.java @@ -0,0 +1,95 @@ +package club.joylink.xiannccda.occmock; + + +import club.joylink.xiannccda.ats.message.FrameSchemaParse; +import club.joylink.xiannccda.ats.message.FrameSchemaParse.DecodeResult; +import club.joylink.xiannccda.ats.message.MessageData; +import club.joylink.xiannccda.ats.message.MessageId; +import club.joylink.xiannccda.entity.MockAtsCollectorData; +import club.joylink.xiannccda.repository.IMockAtsCollectorDataRepository; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import jakarta.annotation.Resource; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +public class AtsCollectorDataTest { + + static final short Flag_Multi = 1; + @Resource + private IMockAtsCollectorDataRepository mockAtsCollectorDataRepository; + + @Test + public void loadEncodeTest() throws Exception { + List collDataList = this.mockAtsCollectorDataRepository.list(); + for (MockAtsCollectorData cd : collDataList) { + ByteBuf in = Unpooled.wrappedBuffer(cd.getSourceDate()); + ByteBuf out = Unpooled.buffer(); + + if (schemaParse(in, out)) { + List messages = new ArrayList(); + messageDecode(out, messages); + for (MessageData message : messages) { + System.out.println(message); + } + + } + } + } + + private static boolean schemaParse(ByteBuf inByteBuf, ByteBuf packageData) { + inByteBuf.markReaderIndex(); + int totalReadables = inByteBuf.readableBytes(); + if (totalReadables < 4) { +// inByteBuf.resetReaderIndex(); + + return false; + } + short sysId = inByteBuf.readUnsignedByte(); + int totalLength = inByteBuf.readUnsignedShort(); + short flag = inByteBuf.readUnsignedByte(); + int contentLen = totalLength - 1; + int readables = inByteBuf.readableBytes(); + if (readables < contentLen) { + inByteBuf.resetReaderIndex(); + + return false; + } + byte[] bb = new byte[contentLen]; + inByteBuf.readBytes(bb); + packageData.writeBytes(bb); + + if (flag == Flag_Multi) { + schemaParse(inByteBuf, packageData); + } + return true; + } + + private static void messageDecode(ByteBuf buf, List messages) throws Exception { + int len = buf.getUnsignedShort(0); + byte[] msgData = new byte[len + 2]; + buf.readBytes(msgData); + ByteBuf msgBuf = Unpooled.wrappedBuffer(msgData); + + int msgId = msgBuf.getUnsignedShort(8); + MessageId messageId = MessageId.of(msgId); + if (messageId.equals(MessageId.UNKNOWN)) { + throw new Exception("未知的消息id"); + } + if (messageId.omc == null || messageId.omc.create() == null) { + throw new Exception( + String.format("id=%s的消息没有消息对象创建接口omc或接口返回null", messageId)); + } + MessageData message = messageId.omc.create(); + message.setMsgId(messageId); + message.decode(msgBuf); + messages.add(message); + + if (buf.readableBytes() > 0) { + messageDecode(buf, messages); + } + } +}