Compare commits
2 Commits
d785915902
...
05ad69fc8f
Author | SHA1 | Date | |
---|---|---|---|
|
05ad69fc8f | ||
|
e3bf6d927d |
@ -8,6 +8,9 @@ import io.netty.buffer.Unpooled;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -19,6 +22,8 @@ public class FrameSchemaParse {
|
|||||||
* key = 端口号(表明是实时还是非实时)
|
* key = 端口号(表明是实时还是非实时)
|
||||||
*/
|
*/
|
||||||
private static final Map<Integer, ByteBuf> PACKAGE_DATA_MAP = new ConcurrentHashMap<>();
|
private static final Map<Integer, ByteBuf> PACKAGE_DATA_MAP = new ConcurrentHashMap<>();
|
||||||
|
private static final Map<Integer, ByteBuf> SOURCE_PACKAGE_DATA_MAP = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
/*private static final ByteBuf INPUT_DATA_BUF = Unpooled.buffer();
|
/*private static final ByteBuf INPUT_DATA_BUF = Unpooled.buffer();
|
||||||
|
|
||||||
public static List<MessageData> decode2(ByteBuf inByteBuf, Integer port) {
|
public static List<MessageData> decode2(ByteBuf inByteBuf, Integer port) {
|
||||||
@ -38,18 +43,27 @@ public class FrameSchemaParse {
|
|||||||
return null;
|
return null;
|
||||||
}*/
|
}*/
|
||||||
|
|
||||||
public static List<MessageData> decode(ByteBuf inByteBuf, Integer port) {
|
|
||||||
|
public static DecodeResult decode(ByteBuf inByteBuf, Integer port) {
|
||||||
ByteBuf packageData = PACKAGE_DATA_MAP.computeIfAbsent(port, (d) -> Unpooled.buffer());
|
ByteBuf packageData = PACKAGE_DATA_MAP.computeIfAbsent(port, (d) -> Unpooled.buffer());
|
||||||
boolean readComplate = schemaParse(inByteBuf, packageData);
|
ByteBuf sourcePackageData = SOURCE_PACKAGE_DATA_MAP.computeIfAbsent(port, (d) -> Unpooled.buffer());
|
||||||
|
boolean readComplate = schemaParse(inByteBuf, packageData, sourcePackageData);
|
||||||
if (readComplate) {
|
if (readComplate) {
|
||||||
try {
|
try {
|
||||||
|
DecodeResult dr = new DecodeResult();
|
||||||
List<MessageData> messageDataList = Lists.newArrayList();
|
List<MessageData> messageDataList = Lists.newArrayList();
|
||||||
messageDecode(packageData, messageDataList);
|
messageDecode(packageData, messageDataList);
|
||||||
return messageDataList;
|
dr.data = messageDataList;
|
||||||
|
byte[] data = new byte[sourcePackageData.readableBytes()];
|
||||||
|
sourcePackageData.readBytes(data);
|
||||||
|
dr.sourceData = data;
|
||||||
|
|
||||||
|
return dr;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("OCC消息解析异常", e);
|
log.error("OCC消息解析异常", e);
|
||||||
return null;
|
return null;
|
||||||
} finally {
|
} finally {
|
||||||
|
sourcePackageData.clear();
|
||||||
packageData.clear();
|
packageData.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -82,7 +96,7 @@ public class FrameSchemaParse {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean schemaParse(ByteBuf inByteBuf, ByteBuf packageData) {
|
private static boolean schemaParse(ByteBuf inByteBuf, ByteBuf packageData, ByteBuf sourcePackageData) {
|
||||||
inByteBuf.markReaderIndex();
|
inByteBuf.markReaderIndex();
|
||||||
int totalReadables = inByteBuf.readableBytes();
|
int totalReadables = inByteBuf.readableBytes();
|
||||||
if (totalReadables < 4) {
|
if (totalReadables < 4) {
|
||||||
@ -100,9 +114,23 @@ public class FrameSchemaParse {
|
|||||||
log.error("可读内容不足 sysId[{}] totalLen[{}] flag[{}] contentLen[{}] readableBytes[{}]", sysId, totalLength, flag, contentLen, readables);
|
log.error("可读内容不足 sysId[{}] totalLen[{}] flag[{}] contentLen[{}] readableBytes[{}]", sysId, totalLength, flag, contentLen, readables);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
// byte[] bb = new byte[contentLen];
|
byte[] bb = new byte[contentLen];
|
||||||
// inByteBuf.readBytes(bb);
|
inByteBuf.readBytes(bb);
|
||||||
packageData.writeBytes(inByteBuf, contentLen);
|
packageData.writeBytes(bb);
|
||||||
|
// packageData.writeBytes(inByteBuf, contentLen);
|
||||||
|
sourcePackageData.writeByte(sysId);
|
||||||
|
sourcePackageData.writeShort(totalLength);
|
||||||
|
sourcePackageData.writeByte(flag);
|
||||||
|
sourcePackageData.writeBytes(bb);
|
||||||
return flag != Flag_Multi;
|
return flag != Flag_Multi;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
public static class DecodeResult {
|
||||||
|
|
||||||
|
private List<MessageData> data;
|
||||||
|
private byte[] sourceData;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -66,6 +66,7 @@ public abstract class MessageData {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void decode(ByteBuf buf) throws Exception {
|
public void decode(ByteBuf buf) throws Exception {
|
||||||
final int headerBytes = 10;
|
final int headerBytes = 10;
|
||||||
int readableBytes = buf.readableBytes();
|
int readableBytes = buf.readableBytes();
|
||||||
@ -122,9 +123,12 @@ public abstract class MessageData {
|
|||||||
private Long id;
|
private Long id;
|
||||||
private List<MessageData> messages;
|
private List<MessageData> messages;
|
||||||
|
|
||||||
public DecodeMessageData(Long id, List<MessageData> messages) {
|
private byte[] sourceData;
|
||||||
|
|
||||||
|
public DecodeMessageData(Long id, List<MessageData> messages, byte[] sourceData) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.messages = messages;
|
this.messages = messages;
|
||||||
|
this.sourceData = sourceData;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -178,9 +178,9 @@ public enum MessageId {
|
|||||||
/**
|
/**
|
||||||
* 消息对象创建接口
|
* 消息对象创建接口
|
||||||
*/
|
*/
|
||||||
OccMessageCreate omc;
|
public OccMessageCreate omc;
|
||||||
|
|
||||||
interface OccMessageCreate {
|
public interface OccMessageCreate {
|
||||||
|
|
||||||
MessageData create();
|
MessageData create();
|
||||||
}
|
}
|
||||||
|
@ -1,11 +1,14 @@
|
|||||||
package club.joylink.xiannccda.ats.message;
|
package club.joylink.xiannccda.ats.message;
|
||||||
|
|
||||||
|
import club.joylink.xiannccda.ats.message.FrameSchemaParse.DecodeResult;
|
||||||
import club.joylink.xiannccda.ats.message.MessageData.DecodeMessageData;
|
import club.joylink.xiannccda.ats.message.MessageData.DecodeMessageData;
|
||||||
import cn.hutool.core.util.IdUtil;
|
import cn.hutool.core.util.IdUtil;
|
||||||
|
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
|
||||||
public class OccMessageDecoder extends ByteToMessageDecoder {
|
public class OccMessageDecoder extends ByteToMessageDecoder {
|
||||||
@ -26,18 +29,9 @@ public class OccMessageDecoder extends ByteToMessageDecoder {
|
|||||||
connection.lastReceiveMessageTime = System.currentTimeMillis();
|
connection.lastReceiveMessageTime = System.currentTimeMillis();
|
||||||
connection.receiveMessageLatest = System.currentTimeMillis();
|
connection.receiveMessageLatest = System.currentTimeMillis();
|
||||||
Long id = IdUtil.getSnowflake().nextId();
|
Long id = IdUtil.getSnowflake().nextId();
|
||||||
|
DecodeResult dr = FrameSchemaParse.decode(in, this.connection.port);
|
||||||
// List<MessageData> messages = FrameSchema.decode2(in);
|
if (Objects.nonNull(dr) && CollectionUtils.isNotEmpty(dr.getData())) {
|
||||||
List<MessageData> messages = FrameSchemaParse.decode(in, this.connection.port);
|
out.add(List.of(new DecodeMessageData(id, dr.getData(), dr.getSourceData())));
|
||||||
if (!(messages == null || messages.size() == 0)) {
|
|
||||||
// System.out.println(
|
|
||||||
// String.format(
|
|
||||||
// "收到OCC消息: %s 来自 port: %s", this.connection.port,
|
|
||||||
// JSON.toJSONString(
|
|
||||||
// messages.get(0), Feature.PrettyFormat, Feature.FieldBased, Feature.WriteNulls)));
|
|
||||||
|
|
||||||
// out.add(messages);
|
|
||||||
out.add(List.of(new DecodeMessageData(id, messages)));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,8 +67,7 @@ public class OccMessageHandler extends SimpleChannelInboundHandler<List<DecodeMe
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (this.collectorData) {
|
if (this.collectorData) {
|
||||||
// MockAppContext.publishCollectorAtsData(messageData);
|
MockAppContext.publishCollectorAtsData(messageData);
|
||||||
MockAppContext.publish(messageData.getMessages());
|
|
||||||
}
|
}
|
||||||
DeviceStatusConvertorManager.doConvertor(this.connection.getNameChanger(), messageData.getMessages());
|
DeviceStatusConvertorManager.doConvertor(this.connection.getNameChanger(), messageData.getMessages());
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,44 @@
|
|||||||
|
package club.joylink.xiannccda.entity;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.IdType;
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableId;
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableName;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
|
import lombok.experimental.Accessors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
*
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author walker-sheng
|
||||||
|
* @since 2024-11-19
|
||||||
|
*/
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
@Accessors(chain = true)
|
||||||
|
@TableName("mock_ats_collector_data")
|
||||||
|
@Schema(name = "MockAtsCollectorData")
|
||||||
|
public class MockAtsCollectorData {
|
||||||
|
|
||||||
|
@TableId(value = "id")
|
||||||
|
private Long id;
|
||||||
|
|
||||||
|
@Schema(description = "线路id")
|
||||||
|
private Integer lineId;
|
||||||
|
@Schema(description = "ats原始数据")
|
||||||
|
private byte[] sourceDate;
|
||||||
|
|
||||||
|
private LocalDateTime createTime;
|
||||||
|
|
||||||
|
public static final String ID = "id";
|
||||||
|
|
||||||
|
public static final String LINE_ID = "line_id";
|
||||||
|
|
||||||
|
public static final String SOURCE_DATE = "source_date";
|
||||||
|
|
||||||
|
public static final String CREATE_TIME = "create_time";
|
||||||
|
}
|
@ -0,0 +1,18 @@
|
|||||||
|
package club.joylink.xiannccda.event;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
|
import org.springframework.context.ApplicationEvent;
|
||||||
|
|
||||||
|
public class MockCollectorMessageDataEvent extends ApplicationEvent {
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
private Integer lineId;
|
||||||
|
@Getter
|
||||||
|
private Long tableId;
|
||||||
|
|
||||||
|
public MockCollectorMessageDataEvent(Object source, Integer lineId, Long tableId) {
|
||||||
|
super(source);
|
||||||
|
this.lineId = lineId;
|
||||||
|
this.tableId = tableId;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,27 @@
|
|||||||
|
package club.joylink.xiannccda.event;
|
||||||
|
|
||||||
|
import club.joylink.xiannccda.entity.MockAtsCollectorData;
|
||||||
|
import club.joylink.xiannccda.repository.IMockAtsCollectorDataRepository;
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class MockCollectorMessageHandle {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private IMockAtsCollectorDataRepository mockAtsCollectorDataRepository;
|
||||||
|
|
||||||
|
@Async("nsExecutor")
|
||||||
|
@EventListener
|
||||||
|
public void mockData(MockCollectorMessageDataEvent event) {
|
||||||
|
MockAtsCollectorData collectorData = new MockAtsCollectorData();
|
||||||
|
collectorData.setId(event.getTableId());
|
||||||
|
collectorData.setLineId(event.getLineId());
|
||||||
|
collectorData.setSourceDate((byte[]) event.getSource());
|
||||||
|
collectorData.setCreateTime(LocalDateTime.now());
|
||||||
|
this.mockAtsCollectorDataRepository.save(collectorData);
|
||||||
|
}
|
||||||
|
}
|
@ -1,10 +0,0 @@
|
|||||||
package club.joylink.xiannccda.mock.collector.event;
|
|
||||||
|
|
||||||
import org.springframework.context.ApplicationEvent;
|
|
||||||
|
|
||||||
public class MockCollectorMessageDataEvent extends ApplicationEvent {
|
|
||||||
|
|
||||||
public MockCollectorMessageDataEvent(Object source) {
|
|
||||||
super(source);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,14 +0,0 @@
|
|||||||
package club.joylink.xiannccda.mock.collector.event;
|
|
||||||
|
|
||||||
import club.joylink.xiannccda.mock.message.event.MockMessageDataEvent;
|
|
||||||
import org.springframework.context.event.EventListener;
|
|
||||||
import org.springframework.scheduling.annotation.Async;
|
|
||||||
|
|
||||||
public class MockCollectorMessageHandle {
|
|
||||||
|
|
||||||
@Async("nsExecutor")
|
|
||||||
@EventListener
|
|
||||||
public void mockData(MockMessageDataEvent event) {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
@ -3,27 +3,22 @@ package club.joylink.xiannccda.mock.message;
|
|||||||
import club.joylink.xiannccda.ats.message.MessageData;
|
import club.joylink.xiannccda.ats.message.MessageData;
|
||||||
import club.joylink.xiannccda.ats.message.MessageData.DecodeMessageData;
|
import club.joylink.xiannccda.ats.message.MessageData.DecodeMessageData;
|
||||||
import club.joylink.xiannccda.ats.message.MessageId;
|
import club.joylink.xiannccda.ats.message.MessageId;
|
||||||
|
import club.joylink.xiannccda.ats.message.MessageResponse;
|
||||||
|
import club.joylink.xiannccda.ats.message.line3.HeartBeatMsg;
|
||||||
import club.joylink.xiannccda.constants.SystemContext;
|
import club.joylink.xiannccda.constants.SystemContext;
|
||||||
import club.joylink.xiannccda.mock.collector.event.MockCollectorMessageDataEvent;
|
import club.joylink.xiannccda.event.MockCollectorMessageDataEvent;
|
||||||
import club.joylink.xiannccda.mock.message.event.MockMessageDataEvent;
|
import club.joylink.xiannccda.mock.message.event.MockMessageDataEvent;
|
||||||
import com.alibaba.fastjson2.JSON;
|
|
||||||
import java.io.BufferedOutputStream;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import org.springframework.beans.BeansException;
|
|
||||||
import org.springframework.context.ApplicationContext;
|
|
||||||
import org.springframework.context.ApplicationContextAware;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
|
|
||||||
public class MockAppContext {
|
public class MockAppContext {
|
||||||
|
|
||||||
public static void publishCollectorAtsData(DecodeMessageData nessageData) {
|
public static void publishCollectorAtsData(DecodeMessageData nessageData) {
|
||||||
// List<MessageData> newList = datas.stream().filter(d -> d.getMsgId() != MessageId.MESSAGE_POLLING).toList();
|
MessageData findMsg = nessageData.getMessages().stream().filter(d -> !(d instanceof HeartBeatMsg)).findFirst().orElseGet(null);
|
||||||
SystemContext.publishEvent(new MockCollectorMessageDataEvent(nessageData));
|
if (Objects.nonNull(findMsg) && findMsg instanceof MessageResponse mr) {
|
||||||
|
SystemContext.publishEvent(new MockCollectorMessageDataEvent(nessageData.getSourceData(), mr.getLineId().intValue(), nessageData.getId()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void publish(List<MessageData> datas) {
|
public static void publish(List<MessageData> datas) {
|
||||||
|
@ -0,0 +1,16 @@
|
|||||||
|
package club.joylink.xiannccda.repository;
|
||||||
|
|
||||||
|
import club.joylink.xiannccda.entity.MockAtsCollectorData;
|
||||||
|
import com.baomidou.mybatisplus.extension.service.IService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* 服务类
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author walker-sheng
|
||||||
|
* @since 2024-11-19
|
||||||
|
*/
|
||||||
|
public interface IMockAtsCollectorDataRepository extends IService<MockAtsCollectorData> {
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,20 @@
|
|||||||
|
package club.joylink.xiannccda.repository.impl;
|
||||||
|
|
||||||
|
import club.joylink.xiannccda.entity.MockAtsCollectorData;
|
||||||
|
import club.joylink.xiannccda.mapper.MockAtsCollectorDataMapper;
|
||||||
|
import club.joylink.xiannccda.repository.IMockAtsCollectorDataRepository;
|
||||||
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* 服务实现类
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author walker-sheng
|
||||||
|
* @since 2024-11-19
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
public class MockAtsCollectorDataRepository extends ServiceImpl<MockAtsCollectorDataMapper, MockAtsCollectorData> implements IMockAtsCollectorDataRepository {
|
||||||
|
|
||||||
|
}
|
@ -21,7 +21,7 @@ occ-client:
|
|||||||
server-host: 127.0.0.1
|
server-host: 127.0.0.1
|
||||||
real-port: 2603
|
real-port: 2603
|
||||||
un-real-port: 5703
|
un-real-port: 5703
|
||||||
collector-data: false
|
collector-data: true
|
||||||
monitor-handware-change: false
|
monitor-handware-change: false
|
||||||
receiveMsgTimeout: 6
|
receiveMsgTimeout: 6
|
||||||
connectionOcc: true
|
connectionOcc: true
|
||||||
@ -40,7 +40,7 @@ occ-client:
|
|||||||
collector-data: true
|
collector-data: true
|
||||||
monitor-handware-change: false
|
monitor-handware-change: false
|
||||||
receiveMsgTimeout: 6
|
receiveMsgTimeout: 6
|
||||||
connectionOcc: false
|
connectionOcc: true
|
||||||
filterRtuIds:
|
filterRtuIds:
|
||||||
- 10
|
- 10
|
||||||
- 11
|
- 11
|
||||||
|
@ -29,7 +29,7 @@ public class MybatisPlusGenerator {
|
|||||||
private static final List<String> includeTableList = new ArrayList<>();
|
private static final List<String> includeTableList = new ArrayList<>();
|
||||||
|
|
||||||
static {
|
static {
|
||||||
includeTableList.add("fault_query");
|
includeTableList.add("mock_ats_collector_data");
|
||||||
// includeTableList.add("relieve_time_shere");
|
// includeTableList.add("relieve_time_shere");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,95 @@
|
|||||||
|
package club.joylink.xiannccda.occmock;
|
||||||
|
|
||||||
|
|
||||||
|
import club.joylink.xiannccda.ats.message.FrameSchemaParse;
|
||||||
|
import club.joylink.xiannccda.ats.message.FrameSchemaParse.DecodeResult;
|
||||||
|
import club.joylink.xiannccda.ats.message.MessageData;
|
||||||
|
import club.joylink.xiannccda.ats.message.MessageId;
|
||||||
|
import club.joylink.xiannccda.entity.MockAtsCollectorData;
|
||||||
|
import club.joylink.xiannccda.repository.IMockAtsCollectorDataRepository;
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
|
||||||
|
@SpringBootTest
|
||||||
|
public class AtsCollectorDataTest {
|
||||||
|
|
||||||
|
static final short Flag_Multi = 1;
|
||||||
|
@Resource
|
||||||
|
private IMockAtsCollectorDataRepository mockAtsCollectorDataRepository;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void loadEncodeTest() throws Exception {
|
||||||
|
List<MockAtsCollectorData> collDataList = this.mockAtsCollectorDataRepository.list();
|
||||||
|
for (MockAtsCollectorData cd : collDataList) {
|
||||||
|
ByteBuf in = Unpooled.wrappedBuffer(cd.getSourceDate());
|
||||||
|
ByteBuf out = Unpooled.buffer();
|
||||||
|
|
||||||
|
if (schemaParse(in, out)) {
|
||||||
|
List<MessageData> messages = new ArrayList<MessageData>();
|
||||||
|
messageDecode(out, messages);
|
||||||
|
for (MessageData message : messages) {
|
||||||
|
System.out.println(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean schemaParse(ByteBuf inByteBuf, ByteBuf packageData) {
|
||||||
|
inByteBuf.markReaderIndex();
|
||||||
|
int totalReadables = inByteBuf.readableBytes();
|
||||||
|
if (totalReadables < 4) {
|
||||||
|
// inByteBuf.resetReaderIndex();
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
short sysId = inByteBuf.readUnsignedByte();
|
||||||
|
int totalLength = inByteBuf.readUnsignedShort();
|
||||||
|
short flag = inByteBuf.readUnsignedByte();
|
||||||
|
int contentLen = totalLength - 1;
|
||||||
|
int readables = inByteBuf.readableBytes();
|
||||||
|
if (readables < contentLen) {
|
||||||
|
inByteBuf.resetReaderIndex();
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
byte[] bb = new byte[contentLen];
|
||||||
|
inByteBuf.readBytes(bb);
|
||||||
|
packageData.writeBytes(bb);
|
||||||
|
|
||||||
|
if (flag == Flag_Multi) {
|
||||||
|
schemaParse(inByteBuf, packageData);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void messageDecode(ByteBuf buf, List<MessageData> messages) throws Exception {
|
||||||
|
int len = buf.getUnsignedShort(0);
|
||||||
|
byte[] msgData = new byte[len + 2];
|
||||||
|
buf.readBytes(msgData);
|
||||||
|
ByteBuf msgBuf = Unpooled.wrappedBuffer(msgData);
|
||||||
|
|
||||||
|
int msgId = msgBuf.getUnsignedShort(8);
|
||||||
|
MessageId messageId = MessageId.of(msgId);
|
||||||
|
if (messageId.equals(MessageId.UNKNOWN)) {
|
||||||
|
throw new Exception("未知的消息id");
|
||||||
|
}
|
||||||
|
if (messageId.omc == null || messageId.omc.create() == null) {
|
||||||
|
throw new Exception(
|
||||||
|
String.format("id=%s的消息没有消息对象创建接口omc或接口返回null", messageId));
|
||||||
|
}
|
||||||
|
MessageData message = messageId.omc.create();
|
||||||
|
message.setMsgId(messageId);
|
||||||
|
message.decode(msgBuf);
|
||||||
|
messages.add(message);
|
||||||
|
|
||||||
|
if (buf.readableBytes() > 0) {
|
||||||
|
messageDecode(buf, messages);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user