ats数据收集调整
This commit is contained in:
parent
d785915902
commit
e3bf6d927d
@ -8,6 +8,9 @@ import io.netty.buffer.Unpooled;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@ -19,6 +22,8 @@ public class FrameSchemaParse {
|
||||
* key = 端口号(表明是实时还是非实时)
|
||||
*/
|
||||
private static final Map<Integer, ByteBuf> PACKAGE_DATA_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Integer, ByteBuf> SOURCE_PACKAGE_DATA_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
/*private static final ByteBuf INPUT_DATA_BUF = Unpooled.buffer();
|
||||
|
||||
public static List<MessageData> decode2(ByteBuf inByteBuf, Integer port) {
|
||||
@ -38,18 +43,27 @@ public class FrameSchemaParse {
|
||||
return null;
|
||||
}*/
|
||||
|
||||
public static List<MessageData> decode(ByteBuf inByteBuf, Integer port) {
|
||||
|
||||
public static DecodeResult decode(ByteBuf inByteBuf, Integer port) {
|
||||
ByteBuf packageData = PACKAGE_DATA_MAP.computeIfAbsent(port, (d) -> Unpooled.buffer());
|
||||
boolean readComplate = schemaParse(inByteBuf, packageData);
|
||||
ByteBuf sourcePackageData = SOURCE_PACKAGE_DATA_MAP.computeIfAbsent(port, (d) -> Unpooled.buffer());
|
||||
boolean readComplate = schemaParse(inByteBuf, packageData, sourcePackageData);
|
||||
if (readComplate) {
|
||||
try {
|
||||
DecodeResult dr = new DecodeResult();
|
||||
List<MessageData> messageDataList = Lists.newArrayList();
|
||||
messageDecode(packageData, messageDataList);
|
||||
return messageDataList;
|
||||
dr.data = messageDataList;
|
||||
byte[] data = new byte[sourcePackageData.readableBytes()];
|
||||
sourcePackageData.readBytes(data);
|
||||
dr.sourceData = data;
|
||||
|
||||
return dr;
|
||||
} catch (Exception e) {
|
||||
log.error("OCC消息解析异常", e);
|
||||
return null;
|
||||
} finally {
|
||||
sourcePackageData.clear();
|
||||
packageData.clear();
|
||||
}
|
||||
}
|
||||
@ -82,7 +96,7 @@ public class FrameSchemaParse {
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean schemaParse(ByteBuf inByteBuf, ByteBuf packageData) {
|
||||
private static boolean schemaParse(ByteBuf inByteBuf, ByteBuf packageData, ByteBuf sourcePackageData) {
|
||||
inByteBuf.markReaderIndex();
|
||||
int totalReadables = inByteBuf.readableBytes();
|
||||
if (totalReadables < 4) {
|
||||
@ -100,9 +114,23 @@ public class FrameSchemaParse {
|
||||
log.error("可读内容不足 sysId[{}] totalLen[{}] flag[{}] contentLen[{}] readableBytes[{}]", sysId, totalLength, flag, contentLen, readables);
|
||||
return false;
|
||||
}
|
||||
// byte[] bb = new byte[contentLen];
|
||||
// inByteBuf.readBytes(bb);
|
||||
packageData.writeBytes(inByteBuf, contentLen);
|
||||
byte[] bb = new byte[contentLen];
|
||||
inByteBuf.readBytes(bb);
|
||||
packageData.writeBytes(bb);
|
||||
// packageData.writeBytes(inByteBuf, contentLen);
|
||||
sourcePackageData.writeByte(sysId);
|
||||
sourcePackageData.writeShort(totalLength);
|
||||
sourcePackageData.writeByte(flag);
|
||||
sourcePackageData.writeBytes(bb);
|
||||
return flag != Flag_Multi;
|
||||
}
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public static class DecodeResult {
|
||||
|
||||
private List<MessageData> data;
|
||||
private byte[] sourceData;
|
||||
}
|
||||
}
|
||||
|
@ -66,6 +66,7 @@ public abstract class MessageData {
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void decode(ByteBuf buf) throws Exception {
|
||||
final int headerBytes = 10;
|
||||
int readableBytes = buf.readableBytes();
|
||||
@ -122,9 +123,12 @@ public abstract class MessageData {
|
||||
private Long id;
|
||||
private List<MessageData> messages;
|
||||
|
||||
public DecodeMessageData(Long id, List<MessageData> messages) {
|
||||
private byte[] sourceData;
|
||||
|
||||
public DecodeMessageData(Long id, List<MessageData> messages, byte[] sourceData) {
|
||||
this.id = id;
|
||||
this.messages = messages;
|
||||
this.sourceData = sourceData;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -178,9 +178,9 @@ public enum MessageId {
|
||||
/**
|
||||
* 消息对象创建接口
|
||||
*/
|
||||
OccMessageCreate omc;
|
||||
public OccMessageCreate omc;
|
||||
|
||||
interface OccMessageCreate {
|
||||
public interface OccMessageCreate {
|
||||
|
||||
MessageData create();
|
||||
}
|
||||
|
@ -1,11 +1,14 @@
|
||||
package club.joylink.xiannccda.ats.message;
|
||||
|
||||
import club.joylink.xiannccda.ats.message.FrameSchemaParse.DecodeResult;
|
||||
import club.joylink.xiannccda.ats.message.MessageData.DecodeMessageData;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import lombok.Getter;
|
||||
|
||||
public class OccMessageDecoder extends ByteToMessageDecoder {
|
||||
@ -26,18 +29,9 @@ public class OccMessageDecoder extends ByteToMessageDecoder {
|
||||
connection.lastReceiveMessageTime = System.currentTimeMillis();
|
||||
connection.receiveMessageLatest = System.currentTimeMillis();
|
||||
Long id = IdUtil.getSnowflake().nextId();
|
||||
|
||||
// List<MessageData> messages = FrameSchema.decode2(in);
|
||||
List<MessageData> messages = FrameSchemaParse.decode(in, this.connection.port);
|
||||
if (!(messages == null || messages.size() == 0)) {
|
||||
// System.out.println(
|
||||
// String.format(
|
||||
// "收到OCC消息: %s 来自 port: %s", this.connection.port,
|
||||
// JSON.toJSONString(
|
||||
// messages.get(0), Feature.PrettyFormat, Feature.FieldBased, Feature.WriteNulls)));
|
||||
|
||||
// out.add(messages);
|
||||
out.add(List.of(new DecodeMessageData(id, messages)));
|
||||
DecodeResult dr = FrameSchemaParse.decode(in, this.connection.port);
|
||||
if (Objects.nonNull(dr) && CollectionUtils.isNotEmpty(dr.getData())) {
|
||||
out.add(List.of(new DecodeMessageData(id, dr.getData(), dr.getSourceData())));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -67,8 +67,7 @@ public class OccMessageHandler extends SimpleChannelInboundHandler<List<DecodeMe
|
||||
}
|
||||
|
||||
if (this.collectorData) {
|
||||
// MockAppContext.publishCollectorAtsData(messageData);
|
||||
MockAppContext.publish(messageData.getMessages());
|
||||
MockAppContext.publishCollectorAtsData(messageData);
|
||||
}
|
||||
DeviceStatusConvertorManager.doConvertor(this.connection.getNameChanger(), messageData.getMessages());
|
||||
}
|
||||
|
@ -0,0 +1,44 @@
|
||||
package club.joylink.xiannccda.entity;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import java.time.LocalDateTime;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author walker-sheng
|
||||
* @since 2024-11-19
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@Accessors(chain = true)
|
||||
@TableName("mock_ats_collector_data")
|
||||
@Schema(name = "MockAtsCollectorData")
|
||||
public class MockAtsCollectorData {
|
||||
|
||||
@TableId(value = "id")
|
||||
private Long id;
|
||||
|
||||
@Schema(description = "线路id")
|
||||
private Integer lineId;
|
||||
@Schema(description = "ats原始数据")
|
||||
private byte[] sourceDate;
|
||||
|
||||
private LocalDateTime createTime;
|
||||
|
||||
public static final String ID = "id";
|
||||
|
||||
public static final String LINE_ID = "line_id";
|
||||
|
||||
public static final String SOURCE_DATE = "source_date";
|
||||
|
||||
public static final String CREATE_TIME = "create_time";
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
package club.joylink.xiannccda.event;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
|
||||
public class MockCollectorMessageDataEvent extends ApplicationEvent {
|
||||
|
||||
@Getter
|
||||
private Integer lineId;
|
||||
@Getter
|
||||
private Long tableId;
|
||||
|
||||
public MockCollectorMessageDataEvent(Object source, Integer lineId, Long tableId) {
|
||||
super(source);
|
||||
this.lineId = lineId;
|
||||
this.tableId = tableId;
|
||||
}
|
||||
}
|
@ -0,0 +1,27 @@
|
||||
package club.joylink.xiannccda.event;
|
||||
|
||||
import club.joylink.xiannccda.entity.MockAtsCollectorData;
|
||||
import club.joylink.xiannccda.repository.IMockAtsCollectorDataRepository;
|
||||
import jakarta.annotation.Resource;
|
||||
import java.time.LocalDateTime;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class MockCollectorMessageHandle {
|
||||
|
||||
@Resource
|
||||
private IMockAtsCollectorDataRepository mockAtsCollectorDataRepository;
|
||||
|
||||
@Async("nsExecutor")
|
||||
@EventListener
|
||||
public void mockData(MockCollectorMessageDataEvent event) {
|
||||
MockAtsCollectorData collectorData = new MockAtsCollectorData();
|
||||
collectorData.setId(event.getTableId());
|
||||
collectorData.setLineId(event.getLineId());
|
||||
collectorData.setSourceDate((byte[]) event.getSource());
|
||||
collectorData.setCreateTime(LocalDateTime.now());
|
||||
this.mockAtsCollectorDataRepository.save(collectorData);
|
||||
}
|
||||
}
|
@ -1,10 +0,0 @@
|
||||
package club.joylink.xiannccda.mock.collector.event;
|
||||
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
|
||||
public class MockCollectorMessageDataEvent extends ApplicationEvent {
|
||||
|
||||
public MockCollectorMessageDataEvent(Object source) {
|
||||
super(source);
|
||||
}
|
||||
}
|
@ -1,14 +0,0 @@
|
||||
package club.joylink.xiannccda.mock.collector.event;
|
||||
|
||||
import club.joylink.xiannccda.mock.message.event.MockMessageDataEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
|
||||
public class MockCollectorMessageHandle {
|
||||
|
||||
@Async("nsExecutor")
|
||||
@EventListener
|
||||
public void mockData(MockMessageDataEvent event) {
|
||||
|
||||
}
|
||||
}
|
@ -3,27 +3,22 @@ package club.joylink.xiannccda.mock.message;
|
||||
import club.joylink.xiannccda.ats.message.MessageData;
|
||||
import club.joylink.xiannccda.ats.message.MessageData.DecodeMessageData;
|
||||
import club.joylink.xiannccda.ats.message.MessageId;
|
||||
import club.joylink.xiannccda.ats.message.MessageResponse;
|
||||
import club.joylink.xiannccda.ats.message.line3.HeartBeatMsg;
|
||||
import club.joylink.xiannccda.constants.SystemContext;
|
||||
import club.joylink.xiannccda.mock.collector.event.MockCollectorMessageDataEvent;
|
||||
import club.joylink.xiannccda.event.MockCollectorMessageDataEvent;
|
||||
import club.joylink.xiannccda.mock.message.event.MockMessageDataEvent;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
|
||||
public class MockAppContext {
|
||||
|
||||
public static void publishCollectorAtsData(DecodeMessageData nessageData) {
|
||||
// List<MessageData> newList = datas.stream().filter(d -> d.getMsgId() != MessageId.MESSAGE_POLLING).toList();
|
||||
SystemContext.publishEvent(new MockCollectorMessageDataEvent(nessageData));
|
||||
MessageData findMsg = nessageData.getMessages().stream().filter(d -> !(d instanceof HeartBeatMsg)).findFirst().orElseGet(null);
|
||||
if (Objects.nonNull(findMsg) && findMsg instanceof MessageResponse mr) {
|
||||
SystemContext.publishEvent(new MockCollectorMessageDataEvent(nessageData.getSourceData(), mr.getLineId().intValue(), nessageData.getId()));
|
||||
}
|
||||
}
|
||||
|
||||
public static void publish(List<MessageData> datas) {
|
||||
|
@ -0,0 +1,16 @@
|
||||
package club.joylink.xiannccda.repository;
|
||||
|
||||
import club.joylink.xiannccda.entity.MockAtsCollectorData;
|
||||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 服务类
|
||||
* </p>
|
||||
*
|
||||
* @author walker-sheng
|
||||
* @since 2024-11-19
|
||||
*/
|
||||
public interface IMockAtsCollectorDataRepository extends IService<MockAtsCollectorData> {
|
||||
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
package club.joylink.xiannccda.repository.impl;
|
||||
|
||||
import club.joylink.xiannccda.entity.MockAtsCollectorData;
|
||||
import club.joylink.xiannccda.mapper.MockAtsCollectorDataMapper;
|
||||
import club.joylink.xiannccda.repository.IMockAtsCollectorDataRepository;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 服务实现类
|
||||
* </p>
|
||||
*
|
||||
* @author walker-sheng
|
||||
* @since 2024-11-19
|
||||
*/
|
||||
@Service
|
||||
public class MockAtsCollectorDataRepository extends ServiceImpl<MockAtsCollectorDataMapper, MockAtsCollectorData> implements IMockAtsCollectorDataRepository {
|
||||
|
||||
}
|
@ -21,7 +21,7 @@ occ-client:
|
||||
server-host: 127.0.0.1
|
||||
real-port: 2603
|
||||
un-real-port: 5703
|
||||
collector-data: false
|
||||
collector-data: true
|
||||
monitor-handware-change: false
|
||||
receiveMsgTimeout: 6
|
||||
connectionOcc: true
|
||||
|
@ -29,7 +29,7 @@ public class MybatisPlusGenerator {
|
||||
private static final List<String> includeTableList = new ArrayList<>();
|
||||
|
||||
static {
|
||||
includeTableList.add("fault_query");
|
||||
includeTableList.add("mock_ats_collector_data");
|
||||
// includeTableList.add("relieve_time_shere");
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,95 @@
|
||||
package club.joylink.xiannccda.occmock;
|
||||
|
||||
|
||||
import club.joylink.xiannccda.ats.message.FrameSchemaParse;
|
||||
import club.joylink.xiannccda.ats.message.FrameSchemaParse.DecodeResult;
|
||||
import club.joylink.xiannccda.ats.message.MessageData;
|
||||
import club.joylink.xiannccda.ats.message.MessageId;
|
||||
import club.joylink.xiannccda.entity.MockAtsCollectorData;
|
||||
import club.joylink.xiannccda.repository.IMockAtsCollectorDataRepository;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import jakarta.annotation.Resource;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
|
||||
@SpringBootTest
|
||||
public class AtsCollectorDataTest {
|
||||
|
||||
static final short Flag_Multi = 1;
|
||||
@Resource
|
||||
private IMockAtsCollectorDataRepository mockAtsCollectorDataRepository;
|
||||
|
||||
@Test
|
||||
public void loadEncodeTest() throws Exception {
|
||||
List<MockAtsCollectorData> collDataList = this.mockAtsCollectorDataRepository.list();
|
||||
for (MockAtsCollectorData cd : collDataList) {
|
||||
ByteBuf in = Unpooled.wrappedBuffer(cd.getSourceDate());
|
||||
ByteBuf out = Unpooled.buffer();
|
||||
|
||||
if (schemaParse(in, out)) {
|
||||
List<MessageData> messages = new ArrayList<MessageData>();
|
||||
messageDecode(out, messages);
|
||||
for (MessageData message : messages) {
|
||||
System.out.println(message);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean schemaParse(ByteBuf inByteBuf, ByteBuf packageData) {
|
||||
inByteBuf.markReaderIndex();
|
||||
int totalReadables = inByteBuf.readableBytes();
|
||||
if (totalReadables < 4) {
|
||||
// inByteBuf.resetReaderIndex();
|
||||
|
||||
return false;
|
||||
}
|
||||
short sysId = inByteBuf.readUnsignedByte();
|
||||
int totalLength = inByteBuf.readUnsignedShort();
|
||||
short flag = inByteBuf.readUnsignedByte();
|
||||
int contentLen = totalLength - 1;
|
||||
int readables = inByteBuf.readableBytes();
|
||||
if (readables < contentLen) {
|
||||
inByteBuf.resetReaderIndex();
|
||||
|
||||
return false;
|
||||
}
|
||||
byte[] bb = new byte[contentLen];
|
||||
inByteBuf.readBytes(bb);
|
||||
packageData.writeBytes(bb);
|
||||
|
||||
if (flag == Flag_Multi) {
|
||||
schemaParse(inByteBuf, packageData);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private static void messageDecode(ByteBuf buf, List<MessageData> messages) throws Exception {
|
||||
int len = buf.getUnsignedShort(0);
|
||||
byte[] msgData = new byte[len + 2];
|
||||
buf.readBytes(msgData);
|
||||
ByteBuf msgBuf = Unpooled.wrappedBuffer(msgData);
|
||||
|
||||
int msgId = msgBuf.getUnsignedShort(8);
|
||||
MessageId messageId = MessageId.of(msgId);
|
||||
if (messageId.equals(MessageId.UNKNOWN)) {
|
||||
throw new Exception("未知的消息id");
|
||||
}
|
||||
if (messageId.omc == null || messageId.omc.create() == null) {
|
||||
throw new Exception(
|
||||
String.format("id=%s的消息没有消息对象创建接口omc或接口返回null", messageId));
|
||||
}
|
||||
MessageData message = messageId.omc.create();
|
||||
message.setMsgId(messageId);
|
||||
message.decode(msgBuf);
|
||||
messages.add(message);
|
||||
|
||||
if (buf.readableBytes() > 0) {
|
||||
messageDecode(buf, messages);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user