Occ消息编解码逻辑调整

完成OCC消息帧和消息解析逻辑
MessageId添加创建消息对象接口参数
This commit is contained in:
walker 2023-06-06 18:37:44 +08:00
parent 502eb413e5
commit 51ecc2fed8
14 changed files with 227 additions and 43 deletions

View File

@ -4,7 +4,9 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FrameSchema {
/**
@ -21,6 +23,7 @@ public class FrameSchema {
* 0表示在这帧数据中消息完整发送没有后续帧同样几个消息可合并成一帧数据发送
*/
short multiFlag;
static final short Flag_Multi = 1;
/**
* 消息内容最长为1024字节它可由一个或多个消息组成也可以是一个消息的一部分每个消息由消息标识和消息数据组成
*/
@ -37,6 +40,18 @@ public class FrameSchema {
}
}
public FrameSchema(short systemId, ByteBuf buf) {
this.systemId = systemId;
int packageLength = buf.readableBytes();
this.packageData = buf;
this.totalLength = packageLength + 1;
if (packageLength == PackageMaxLength) {
this.multiFlag = 1;
} else {
this.multiFlag = 0;
}
}
public static int calculateMessageLength(List<MessageData> messages) {
int total = 0;
for (MessageData data : messages) {
@ -82,6 +97,86 @@ public class FrameSchema {
return messages;
}
public static List<MessageData> decode(ByteBuf in) throws Exception {
in.markReaderIndex();
boolean success = true;
try {
List<FrameSchema> frames = new ArrayList<>();
frameDecode(in, frames);
if (frames.size() > 0) {
ByteBuf buf = Unpooled.buffer();
for (FrameSchema frame : frames) {
buf.writeBytes(frame.packageData);
}
List<MessageData> messages = new ArrayList<>();
try {
messageDecode(buf, messages);
} catch (Exception e) {
log.error("OCC消息解析异常", e);
return null;
}
return messages;
} else {
success = false;
}
} catch (Exception e) {
success = false;
throw e;
} finally {
if (!success) {
in.resetReaderIndex();
}
}
return null;
}
/**
* 具体消息解析
*
* @param buf
* @param messages
* @throws Exception
*/
private static void messageDecode(ByteBuf buf, List<MessageData> messages) throws Exception {
int len = buf.getUnsignedShort(0);
ByteBuf msgBuf = buf.readBytes(len + 2);
int msgId = msgBuf.getUnsignedShort(8);
MessageId messageId = MessageId.of(msgId);
if (messageId.equals(MessageId.UNKNOWN)) {
throw new Exception("位置的消息id");
}
MessageData message = messageId.omc.create();
message.decode(msgBuf);
messages.add(message);
if (buf.readableBytes() > 0) {
messageDecode(buf, messages);
}
}
/**
* 信息帧解析
*
* @param in
* @param frames
* @throws Exception
*/
static void frameDecode(ByteBuf in, List<FrameSchema> frames) throws Exception {
if (in.readableBytes() < 4) {
throw new Exception("数据不全");
}
short sysId = in.readUnsignedByte();
int totalLength = in.readUnsignedShort();
short flag = in.readUnsignedByte();
int contentLen = totalLength - 1;
if (in.readableBytes() < contentLen) {
throw new Exception("可读内容不足");
}
frames.add(new FrameSchema(sysId, in.readBytes(contentLen)));
if (flag == Flag_Multi) {
frameDecode(in, frames);
}
}
public void encode(ByteBuf buf) {
buf.writeByte(this.systemId);
buf.writeShort(this.totalLength);

View File

@ -41,6 +41,7 @@ public abstract class MessageData {
this.time = buf.readUnsignedInt();
this.version = buf.readUnsignedShort();
this.msgId = MessageId.of(buf.readShort());
this.decode2(buf);
} else {
throw new Exception(
String.format("OCC消息可读字节数小于%sreadableBytes=%s", headerBytes, readableBytes));

View File

@ -1,40 +1,49 @@
package club.joylink.xiannccda.ats.message;
import club.joylink.xiannccda.ats.message.line3.HeartBeatMsg;
public enum MessageId {
UNKNOWN(0x0000),
UNKNOWN(0x0000, null),
/**
* 心跳
*/
MESSAGE_POLLING(0x0001),
MESSAGE_POLLING(0x0001, () -> new HeartBeatMsg()),
/**
* 信息源网络状态 OCC->NCC
*/
NETWORK_ALIVE_STATUS(0x0002),
NETWORK_ALIVE_STATUS(0x0002, null),
/**
* 2.7.3 ATS信息请求消息 OCC<-NCC
*/
LOAD_DEVICE_STATUS(0x0003),
LOAD_DEVICE_STATUS(0x0003, null),
/**
* 2.7.4 设备状态全体消息
*/
DEVICE_STATUS_BITMAP(0x0004),
DEVICE_STATUS_BITMAP(0x0004, null),
/**
* 2.7.5 设备状态变化消息
*/
DEVICE_STATUS_CHANGE(0x0005),
DEVICE_STATUS_CHANGE(0x0005, null),
/**
* 2.7.6 自动/人工信号模式消息
*/
SIGNAL_ROUTE_STATUS(0x0006),
SIGNAL_ROUTE_STATUS(0x0006, null),
/**
* 2.7.7 出入库派班计划消息
*/
DEPOT_PLAN(0x0007);
DEPOT_PLAN(0x0007, null);
int val;
OccMessageCreate omc;
MessageId(int val) {
interface OccMessageCreate {
MessageData create();
}
MessageId(int val, OccMessageCreate omc) {
this.val = val;
this.omc = omc;
}
public static MessageId of(int val) {

View File

@ -0,0 +1,29 @@
package club.joylink.xiannccda.ats.message;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONWriter.Feature;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class OccMessageDecoder extends ByteToMessageDecoder {
final XianOccMessagingClient client;
public OccMessageDecoder(XianOccMessagingClient client) {
this.client = client;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
List<MessageData> messages = FrameSchema.decode(in);
if (!(messages == null || messages.size() == 0)) {
System.out.println(
String.format("收到消息: %s",
JSON.toJSONString(messages.get(0), Feature.PrettyFormat, Feature.FieldBased,
Feature.WriteNulls)));
out.add(messages);
}
}
}

View File

@ -2,28 +2,27 @@ package club.joylink.xiannccda.ats.message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import io.netty.handler.codec.MessageToByteEncoder;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FrameHandler extends ByteToMessageCodec<FrameSchema> {
public class OccMessageEncoder extends MessageToByteEncoder<List<MessageData>> {
final XianOccMessagingClient client;
public FrameHandler(XianOccMessagingClient client) {
public OccMessageEncoder(XianOccMessagingClient client) {
this.client = client;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, FrameSchema frameSchema,
protected void encode(ChannelHandlerContext channelHandlerContext, List<MessageData> messages,
ByteBuf byteBuf) throws Exception {
log.info("发送消息到OCC");
List<FrameSchema> frames = FrameSchema.buildFrom(messages);
for (FrameSchema frame : frames) {
frame.encode(byteBuf);
}
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf,
List<Object> list) throws Exception {
log.info("收到OCC消息");
}
}

View File

@ -34,7 +34,8 @@ public class TcpClientConnection {
.handler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new FrameHandler(client));
ch.pipeline().addLast(new OccMessageEncoder(client));
ch.pipeline().addLast(new OccMessageDecoder(client));
}
});
}

View File

@ -1,16 +1,19 @@
package club.joylink.xiannccda.configuration;
import club.joylink.xiannccda.exception.BusinessException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ProblemDetail;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
@RestControllerAdvice
@Slf4j
public class ResponseExceptionHandler {
@ExceptionHandler(BusinessException.class)
public ProblemDetail businessExceptionHandler(BusinessException e) {
log.error("业务异常", e);
ProblemDetail problemDetail = ProblemDetail.forStatus(HttpStatus.INTERNAL_SERVER_ERROR);
problemDetail.setProperty("code", e.getCode());
problemDetail.setTitle(e.getMessage());
@ -25,6 +28,7 @@ public class ResponseExceptionHandler {
*/
@ExceptionHandler(Exception.class)
public ProblemDetail UncaughtExceptionHandler(Exception e) {
log.error("未处理异常", e);
ProblemDetail problemDetail = ProblemDetail.forStatus(HttpStatus.INTERNAL_SERVER_ERROR);
problemDetail.setTitle("未知错误");
problemDetail.setDetail(e.getMessage());

View File

@ -9,6 +9,8 @@ import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.security.Principal;
import org.springframework.security.core.Authentication;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
@ -50,17 +52,18 @@ public class DraftingController {
@SecurityRequirement(name = "jwt")
@Operation(summary = "创建绘图草稿")
@ApiResponse(description = "绘图草稿信息")
public Drafting create(@RequestBody @Validated(Creation.class) Drafting drafting) {
return this.draftingRepository.create(drafting);
public Drafting create(Principal user,
@RequestBody @Validated(Creation.class) Drafting drafting) {
return this.draftingRepository.create(drafting, Integer.valueOf(user.getName()));
}
@PostMapping("/{id}/saveAs")
@SecurityRequirement(name = "jwt")
@Operation(summary = "创建绘图草稿")
@ApiResponse(description = "绘图草稿信息")
public Drafting create(@PathVariable Integer id,
public Drafting create(Principal user, @PathVariable Integer id,
@RequestBody @Validated(Creation.class) Drafting drafting) {
return this.draftingRepository.saveAs(id, drafting);
return this.draftingRepository.saveAs(id, drafting, Integer.valueOf(user.getName()));
}
@GetMapping("/{id}")

View File

@ -36,6 +36,9 @@ public class Drafting {
@NotNull(message = "数据不能为空", groups = {SaveData.class, SaveAs.class})
private byte[] proto;
@Schema(description = "创建用户id")
private Integer creatorId;
@Schema(description = "创建时间")
private LocalDateTime createdAt;

View File

@ -23,9 +23,9 @@ public interface IDraftingRepository extends IService<Drafting> {
*/
Page<Drafting> pageQuery(DraftingQueryDto query);
Drafting create(Drafting drafting);
Drafting create(Drafting drafting, Integer creatorId);
boolean updateDrawData(Integer id, byte[] proto);
Drafting saveAs(Integer id, Drafting drafting);
Drafting saveAs(Integer id, Drafting drafting, Integer creatorId);
}

View File

@ -40,8 +40,9 @@ public class DraftingRepository extends ServiceImpl<DraftingMapper, Drafting> im
}
@Override
public Drafting create(Drafting drafting) {
public Drafting create(Drafting drafting, Integer creatorId) {
drafting.setCreatedAt(LocalDateTime.now());
drafting.setCreatorId(creatorId);
this.save(drafting);
return drafting;
}
@ -56,7 +57,7 @@ public class DraftingRepository extends ServiceImpl<DraftingMapper, Drafting> im
}
@Override
public Drafting saveAs(Integer id, Drafting drafting) {
public Drafting saveAs(Integer id, Drafting drafting, Integer creatorId) {
BusinessExceptionAssertEnum.UNIQUE_FIELD_REPEAT.assertNotTrue(
this.isNameExist(drafting.getName()),
String.format("草稿名称已存在: %s", drafting.getName()));
@ -64,6 +65,7 @@ public class DraftingRepository extends ServiceImpl<DraftingMapper, Drafting> im
old.setId(null);
old.setName(drafting.getName());
old.setProto(drafting.getProto());
old.setCreatorId(creatorId);
this.save(old);
old.setProto(null); // 另存为数据不回传
return old;

View File

@ -3,33 +3,72 @@ package club.joylink.xiannccda.occmock;
import club.joylink.xiannccda.ats.message.FrameSchema;
import club.joylink.xiannccda.ats.message.line3.HeartBeatMsg;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class OccHandler extends ByteToMessageCodec<FrameSchema> {
public class OccHandler extends ByteToMessageCodec<List<FrameSchema>> {
static final ScheduledExecutorService Executor = Executors.newSingleThreadScheduledExecutor();
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("客户端连接: {}", ctx.channel().id());
ctx.executor().scheduleAtFixedRate(() -> {
log.info("定时发送心跳消息");
List<FrameSchema> frames = FrameSchema.buildFrom(new HeartBeatMsg());
for (FrameSchema frame : frames) {
ctx.channel().write(frame);
}
ctx.channel().flush();
Map<String, Channel> clients = new ConcurrentHashMap<>();
public OccHandler() {
this.startHeartBeatTask();
}
public void startHeartBeatTask() {
Executor.scheduleAtFixedRate(() -> {
clients.forEach((cid, channel) -> {
log.info("定时发送心跳消息");
List<FrameSchema> frames = FrameSchema.buildFrom(new HeartBeatMsg());
channel.writeAndFlush(frames);
// for (FrameSchema frame : frames) {
// channel.write(frame);
// }
// channel.flush();
});
}, 1, 10, TimeUnit.SECONDS);
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, FrameSchema frameSchema,
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
String id = ctx.channel().id().asLongText();
log.info("客户端连接: {}", id);
clients.put(ctx.channel().id().asLongText(), ctx.channel());
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
String id = ctx.channel().id().asLongText();
log.info("客户端断开连接: {}", id);
clients.remove(id);
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, List<FrameSchema> frames,
ByteBuf byteBuf) throws Exception {
frameSchema.encode(byteBuf);
for (FrameSchema frame : frames) {
frame.encode(byteBuf);
}
int len = byteBuf.readableBytes();
byte[] bytes = new byte[len];
byteBuf.getBytes(0, bytes);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < bytes.length; i++) {
sb.append("0x" + Integer.toHexString(bytes[i]));
sb.append(" ");
}
System.out.println(String.format("OCC发送消息: %s", sb.toString()));
}
@Override

View File

@ -20,7 +20,6 @@ public class OccServer {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);//创建boos线程组用于服务端接受客户端的连接
EventLoopGroup workerGroup = new NioEventLoopGroup();//创建worker线程组用于进行SocketChannel的数据读写处理业务逻辑
//创建Handler
final OccHandler serverHandler = new OccHandler();
try {
//创建ServerBootstrap对象
ServerBootstrap b = new ServerBootstrap();
@ -32,7 +31,7 @@ public class OccServer {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(serverHandler);
p.addLast(new OccHandler());
}
});

@ -1 +1 @@
Subproject commit 9aa54608cd592ff96fc506e310fb19caff67f809
Subproject commit 86de0cff9ee560f0c7e670184d5baccfdb28ee44