Compare commits

...

2 Commits

Author SHA1 Message Date
tiger_zhou
05ad69fc8f ats数据收集调整
Some checks failed
local-test分支构建docker并发布运行 / Docker-Build (push) Failing after 34s
2024-11-20 08:55:36 +08:00
tiger_zhou
e3bf6d927d ats数据收集调整 2024-11-20 08:55:03 +08:00
16 changed files with 279 additions and 63 deletions

View File

@ -8,6 +8,9 @@ import io.netty.buffer.Unpooled;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ -19,6 +22,8 @@ public class FrameSchemaParse {
* key = 端口号(表明是实时还是非实时)
*/
private static final Map<Integer, ByteBuf> PACKAGE_DATA_MAP = new ConcurrentHashMap<>();
private static final Map<Integer, ByteBuf> SOURCE_PACKAGE_DATA_MAP = new ConcurrentHashMap<>();
/*private static final ByteBuf INPUT_DATA_BUF = Unpooled.buffer();
public static List<MessageData> decode2(ByteBuf inByteBuf, Integer port) {
@ -38,18 +43,27 @@ public class FrameSchemaParse {
return null;
}*/
public static List<MessageData> decode(ByteBuf inByteBuf, Integer port) {
public static DecodeResult decode(ByteBuf inByteBuf, Integer port) {
ByteBuf packageData = PACKAGE_DATA_MAP.computeIfAbsent(port, (d) -> Unpooled.buffer());
boolean readComplate = schemaParse(inByteBuf, packageData);
ByteBuf sourcePackageData = SOURCE_PACKAGE_DATA_MAP.computeIfAbsent(port, (d) -> Unpooled.buffer());
boolean readComplate = schemaParse(inByteBuf, packageData, sourcePackageData);
if (readComplate) {
try {
DecodeResult dr = new DecodeResult();
List<MessageData> messageDataList = Lists.newArrayList();
messageDecode(packageData, messageDataList);
return messageDataList;
dr.data = messageDataList;
byte[] data = new byte[sourcePackageData.readableBytes()];
sourcePackageData.readBytes(data);
dr.sourceData = data;
return dr;
} catch (Exception e) {
log.error("OCC消息解析异常", e);
return null;
} finally {
sourcePackageData.clear();
packageData.clear();
}
}
@ -82,7 +96,7 @@ public class FrameSchemaParse {
}
}
private static boolean schemaParse(ByteBuf inByteBuf, ByteBuf packageData) {
private static boolean schemaParse(ByteBuf inByteBuf, ByteBuf packageData, ByteBuf sourcePackageData) {
inByteBuf.markReaderIndex();
int totalReadables = inByteBuf.readableBytes();
if (totalReadables < 4) {
@ -100,9 +114,23 @@ public class FrameSchemaParse {
log.error("可读内容不足 sysId[{}] totalLen[{}] flag[{}] contentLen[{}] readableBytes[{}]", sysId, totalLength, flag, contentLen, readables);
return false;
}
// byte[] bb = new byte[contentLen];
// inByteBuf.readBytes(bb);
packageData.writeBytes(inByteBuf, contentLen);
byte[] bb = new byte[contentLen];
inByteBuf.readBytes(bb);
packageData.writeBytes(bb);
// packageData.writeBytes(inByteBuf, contentLen);
sourcePackageData.writeByte(sysId);
sourcePackageData.writeShort(totalLength);
sourcePackageData.writeByte(flag);
sourcePackageData.writeBytes(bb);
return flag != Flag_Multi;
}
@Getter
@AllArgsConstructor
@NoArgsConstructor
public static class DecodeResult {
private List<MessageData> data;
private byte[] sourceData;
}
}

View File

@ -66,6 +66,7 @@ public abstract class MessageData {
}
public void decode(ByteBuf buf) throws Exception {
final int headerBytes = 10;
int readableBytes = buf.readableBytes();
@ -122,9 +123,12 @@ public abstract class MessageData {
private Long id;
private List<MessageData> messages;
public DecodeMessageData(Long id, List<MessageData> messages) {
private byte[] sourceData;
public DecodeMessageData(Long id, List<MessageData> messages, byte[] sourceData) {
this.id = id;
this.messages = messages;
this.sourceData = sourceData;
}
}
}

View File

@ -178,9 +178,9 @@ public enum MessageId {
/**
* 消息对象创建接口
*/
OccMessageCreate omc;
public OccMessageCreate omc;
interface OccMessageCreate {
public interface OccMessageCreate {
MessageData create();
}

View File

@ -1,11 +1,14 @@
package club.joylink.xiannccda.ats.message;
import club.joylink.xiannccda.ats.message.FrameSchemaParse.DecodeResult;
import club.joylink.xiannccda.ats.message.MessageData.DecodeMessageData;
import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
import java.util.Objects;
import lombok.Getter;
public class OccMessageDecoder extends ByteToMessageDecoder {
@ -26,18 +29,9 @@ public class OccMessageDecoder extends ByteToMessageDecoder {
connection.lastReceiveMessageTime = System.currentTimeMillis();
connection.receiveMessageLatest = System.currentTimeMillis();
Long id = IdUtil.getSnowflake().nextId();
// List<MessageData> messages = FrameSchema.decode2(in);
List<MessageData> messages = FrameSchemaParse.decode(in, this.connection.port);
if (!(messages == null || messages.size() == 0)) {
// System.out.println(
// String.format(
// "收到OCC消息: %s 来自 port: %s", this.connection.port,
// JSON.toJSONString(
// messages.get(0), Feature.PrettyFormat, Feature.FieldBased, Feature.WriteNulls)));
// out.add(messages);
out.add(List.of(new DecodeMessageData(id, messages)));
DecodeResult dr = FrameSchemaParse.decode(in, this.connection.port);
if (Objects.nonNull(dr) && CollectionUtils.isNotEmpty(dr.getData())) {
out.add(List.of(new DecodeMessageData(id, dr.getData(), dr.getSourceData())));
}
}

View File

@ -67,8 +67,7 @@ public class OccMessageHandler extends SimpleChannelInboundHandler<List<DecodeMe
}
if (this.collectorData) {
// MockAppContext.publishCollectorAtsData(messageData);
MockAppContext.publish(messageData.getMessages());
MockAppContext.publishCollectorAtsData(messageData);
}
DeviceStatusConvertorManager.doConvertor(this.connection.getNameChanger(), messageData.getMessages());
}

View File

@ -0,0 +1,44 @@
package club.joylink.xiannccda.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.time.LocalDateTime;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
/**
* <p>
*
* </p>
*
* @author walker-sheng
* @since 2024-11-19
*/
@Getter
@Setter
@Accessors(chain = true)
@TableName("mock_ats_collector_data")
@Schema(name = "MockAtsCollectorData")
public class MockAtsCollectorData {
@TableId(value = "id")
private Long id;
@Schema(description = "线路id")
private Integer lineId;
@Schema(description = "ats原始数据")
private byte[] sourceDate;
private LocalDateTime createTime;
public static final String ID = "id";
public static final String LINE_ID = "line_id";
public static final String SOURCE_DATE = "source_date";
public static final String CREATE_TIME = "create_time";
}

View File

@ -0,0 +1,18 @@
package club.joylink.xiannccda.event;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
public class MockCollectorMessageDataEvent extends ApplicationEvent {
@Getter
private Integer lineId;
@Getter
private Long tableId;
public MockCollectorMessageDataEvent(Object source, Integer lineId, Long tableId) {
super(source);
this.lineId = lineId;
this.tableId = tableId;
}
}

View File

@ -0,0 +1,27 @@
package club.joylink.xiannccda.event;
import club.joylink.xiannccda.entity.MockAtsCollectorData;
import club.joylink.xiannccda.repository.IMockAtsCollectorDataRepository;
import jakarta.annotation.Resource;
import java.time.LocalDateTime;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class MockCollectorMessageHandle {
@Resource
private IMockAtsCollectorDataRepository mockAtsCollectorDataRepository;
@Async("nsExecutor")
@EventListener
public void mockData(MockCollectorMessageDataEvent event) {
MockAtsCollectorData collectorData = new MockAtsCollectorData();
collectorData.setId(event.getTableId());
collectorData.setLineId(event.getLineId());
collectorData.setSourceDate((byte[]) event.getSource());
collectorData.setCreateTime(LocalDateTime.now());
this.mockAtsCollectorDataRepository.save(collectorData);
}
}

View File

@ -1,10 +0,0 @@
package club.joylink.xiannccda.mock.collector.event;
import org.springframework.context.ApplicationEvent;
public class MockCollectorMessageDataEvent extends ApplicationEvent {
public MockCollectorMessageDataEvent(Object source) {
super(source);
}
}

View File

@ -1,14 +0,0 @@
package club.joylink.xiannccda.mock.collector.event;
import club.joylink.xiannccda.mock.message.event.MockMessageDataEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
public class MockCollectorMessageHandle {
@Async("nsExecutor")
@EventListener
public void mockData(MockMessageDataEvent event) {
}
}

View File

@ -3,27 +3,22 @@ package club.joylink.xiannccda.mock.message;
import club.joylink.xiannccda.ats.message.MessageData;
import club.joylink.xiannccda.ats.message.MessageData.DecodeMessageData;
import club.joylink.xiannccda.ats.message.MessageId;
import club.joylink.xiannccda.ats.message.MessageResponse;
import club.joylink.xiannccda.ats.message.line3.HeartBeatMsg;
import club.joylink.xiannccda.constants.SystemContext;
import club.joylink.xiannccda.mock.collector.event.MockCollectorMessageDataEvent;
import club.joylink.xiannccda.event.MockCollectorMessageDataEvent;
import club.joylink.xiannccda.mock.message.event.MockMessageDataEvent;
import com.alibaba.fastjson2.JSON;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
public class MockAppContext {
public static void publishCollectorAtsData(DecodeMessageData nessageData) {
// List<MessageData> newList = datas.stream().filter(d -> d.getMsgId() != MessageId.MESSAGE_POLLING).toList();
SystemContext.publishEvent(new MockCollectorMessageDataEvent(nessageData));
MessageData findMsg = nessageData.getMessages().stream().filter(d -> !(d instanceof HeartBeatMsg)).findFirst().orElseGet(null);
if (Objects.nonNull(findMsg) && findMsg instanceof MessageResponse mr) {
SystemContext.publishEvent(new MockCollectorMessageDataEvent(nessageData.getSourceData(), mr.getLineId().intValue(), nessageData.getId()));
}
}
public static void publish(List<MessageData> datas) {

View File

@ -0,0 +1,16 @@
package club.joylink.xiannccda.repository;
import club.joylink.xiannccda.entity.MockAtsCollectorData;
import com.baomidou.mybatisplus.extension.service.IService;
/**
* <p>
* 服务类
* </p>
*
* @author walker-sheng
* @since 2024-11-19
*/
public interface IMockAtsCollectorDataRepository extends IService<MockAtsCollectorData> {
}

View File

@ -0,0 +1,20 @@
package club.joylink.xiannccda.repository.impl;
import club.joylink.xiannccda.entity.MockAtsCollectorData;
import club.joylink.xiannccda.mapper.MockAtsCollectorDataMapper;
import club.joylink.xiannccda.repository.IMockAtsCollectorDataRepository;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
/**
* <p>
* 服务实现类
* </p>
*
* @author walker-sheng
* @since 2024-11-19
*/
@Service
public class MockAtsCollectorDataRepository extends ServiceImpl<MockAtsCollectorDataMapper, MockAtsCollectorData> implements IMockAtsCollectorDataRepository {
}

View File

@ -21,7 +21,7 @@ occ-client:
server-host: 127.0.0.1
real-port: 2603
un-real-port: 5703
collector-data: false
collector-data: true
monitor-handware-change: false
receiveMsgTimeout: 6
connectionOcc: true
@ -40,7 +40,7 @@ occ-client:
collector-data: true
monitor-handware-change: false
receiveMsgTimeout: 6
connectionOcc: false
connectionOcc: true
filterRtuIds:
- 10
- 11

View File

@ -29,7 +29,7 @@ public class MybatisPlusGenerator {
private static final List<String> includeTableList = new ArrayList<>();
static {
includeTableList.add("fault_query");
includeTableList.add("mock_ats_collector_data");
// includeTableList.add("relieve_time_shere");
}

View File

@ -0,0 +1,95 @@
package club.joylink.xiannccda.occmock;
import club.joylink.xiannccda.ats.message.FrameSchemaParse;
import club.joylink.xiannccda.ats.message.FrameSchemaParse.DecodeResult;
import club.joylink.xiannccda.ats.message.MessageData;
import club.joylink.xiannccda.ats.message.MessageId;
import club.joylink.xiannccda.entity.MockAtsCollectorData;
import club.joylink.xiannccda.repository.IMockAtsCollectorDataRepository;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import jakarta.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class AtsCollectorDataTest {
static final short Flag_Multi = 1;
@Resource
private IMockAtsCollectorDataRepository mockAtsCollectorDataRepository;
@Test
public void loadEncodeTest() throws Exception {
List<MockAtsCollectorData> collDataList = this.mockAtsCollectorDataRepository.list();
for (MockAtsCollectorData cd : collDataList) {
ByteBuf in = Unpooled.wrappedBuffer(cd.getSourceDate());
ByteBuf out = Unpooled.buffer();
if (schemaParse(in, out)) {
List<MessageData> messages = new ArrayList<MessageData>();
messageDecode(out, messages);
for (MessageData message : messages) {
System.out.println(message);
}
}
}
}
private static boolean schemaParse(ByteBuf inByteBuf, ByteBuf packageData) {
inByteBuf.markReaderIndex();
int totalReadables = inByteBuf.readableBytes();
if (totalReadables < 4) {
// inByteBuf.resetReaderIndex();
return false;
}
short sysId = inByteBuf.readUnsignedByte();
int totalLength = inByteBuf.readUnsignedShort();
short flag = inByteBuf.readUnsignedByte();
int contentLen = totalLength - 1;
int readables = inByteBuf.readableBytes();
if (readables < contentLen) {
inByteBuf.resetReaderIndex();
return false;
}
byte[] bb = new byte[contentLen];
inByteBuf.readBytes(bb);
packageData.writeBytes(bb);
if (flag == Flag_Multi) {
schemaParse(inByteBuf, packageData);
}
return true;
}
private static void messageDecode(ByteBuf buf, List<MessageData> messages) throws Exception {
int len = buf.getUnsignedShort(0);
byte[] msgData = new byte[len + 2];
buf.readBytes(msgData);
ByteBuf msgBuf = Unpooled.wrappedBuffer(msgData);
int msgId = msgBuf.getUnsignedShort(8);
MessageId messageId = MessageId.of(msgId);
if (messageId.equals(MessageId.UNKNOWN)) {
throw new Exception("未知的消息id");
}
if (messageId.omc == null || messageId.omc.create() == null) {
throw new Exception(
String.format("id=%s的消息没有消息对象创建接口omc或接口返回null", messageId));
}
MessageData message = messageId.omc.create();
message.setMsgId(messageId);
message.decode(msgBuf);
messages.add(message);
if (buf.readableBytes() > 0) {
messageDecode(buf, messages);
}
}
}