1,修改encode编码接口

2,完善frame定义及构造
3,添加OCC测试服务
4,添加绘图操作保存、另存为等接口
This commit is contained in:
walker 2023-06-06 15:56:11 +08:00
parent 87cae1312d
commit fd617a14b9
12 changed files with 256 additions and 29 deletions

View File

@ -15,11 +15,6 @@ public class FrameHandler extends ByteToMessageCodec<FrameSchema> {
this.client = client;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, FrameSchema frameSchema,
ByteBuf byteBuf) throws Exception {

View File

@ -1,6 +1,8 @@
package club.joylink.xiannccda.ats.message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.List;
public class FrameSchema {
@ -8,11 +10,12 @@ public class FrameSchema {
/**
* (System ID)占1字节系统中每台计算机均有唯一的System ID, 此处表示发送方的System ID为0xff
*/
short systemId;
short systemId = 0xff;
/**
* (Total Length) 占2字节表示每个信息帧中消息的总长度+1字节Multi-flag 最长为1025字节如果数据长度大于1024字节剩余的在紧接着的几帧数据中发送
*/
int totalLength;
final static int PackageMaxLength = 1024;
/**
* (Multi-flag)为帧标识位占1字节值为0或者1分别表示下面的意思 1表示在这帧数据中一个完整的消息没有发送结束有后续帧
* 0表示在这帧数据中消息完整发送没有后续帧同样几个消息可合并成一帧数据发送
@ -23,5 +26,66 @@ public class FrameSchema {
*/
ByteBuf packageData;
List<MessageData> messages;
public FrameSchema(ByteBuf buf) {
int packageLength = buf.readableBytes();
this.packageData = buf;
this.totalLength = packageLength + 1;
if (packageLength == PackageMaxLength) {
this.multiFlag = 1;
} else {
this.multiFlag = 0;
}
}
public static int calculateMessageLength(List<MessageData> messages) {
int total = 0;
for (MessageData data : messages) {
total += data.total();
}
return total;
}
public static List<FrameSchema> buildFrom(MessageData... messages) {
return buildFrom(List.of(messages));
}
public static List<FrameSchema> buildFrom(List<MessageData> messages) {
final int messageLength = calculateMessageLength(messages);
int size = 1;
if (messageLength > PackageMaxLength) {
size = messageLength / PackageMaxLength + 1;
}
ByteBuf buf = Unpooled.buffer(messageLength);
for (MessageData data : messages) {
data.encode(buf);
}
List<FrameSchema> frames = new ArrayList<>();
for (int i = 1; i <= size; i++) {
FrameSchema frame;
if (i == size) {
frame = new FrameSchema(buf.readBytes(messageLength % PackageMaxLength));
} else {
frame = new FrameSchema(buf.readBytes(PackageMaxLength));
}
frames.add(frame);
}
return frames;
}
public static List<MessageData> decodeFrom(FrameSchema... frames) {
return decodeFrom(List.of(frames));
}
public static List<MessageData> decodeFrom(List<FrameSchema> frames) {
List<MessageData> messages = new ArrayList<>();
return messages;
}
public void encode(ByteBuf buf) {
buf.writeByte(this.systemId);
buf.writeShort(this.totalLength);
buf.writeByte(this.multiFlag);
buf.writeBytes(this.packageData);
}
}

View File

@ -22,6 +22,13 @@ public abstract class MessageData {
*/
MessageId msgId;
public MessageData(MessageId msgId, int contentLength) {
this.length = 8 + contentLength;
this.time = System.currentTimeMillis();
this.version = 0x01;
this.msgId = msgId;
}
public void decode(ByteBuf buf) throws Exception {
final int headerBytes = 10;
int readableBytes = buf.readableBytes();
@ -44,24 +51,13 @@ public abstract class MessageData {
*/
public abstract void decode2(ByteBuf buf) throws Exception;
/**
* 消息总长度: 时间戳Time长度+ 版本号Version长度+消息IDmessage _id长度2字节+ 消息内容content长度
*
* @return
*/
public int messageLength() {
return 8;
}
public ByteBuf encode() {
final int messageLength = this.messageLength();
ByteBuf buf = Unpooled.buffer(messageLength + 2);
public void encode(ByteBuf buf) {
final int messageLength = this.length;
buf.writeShort(messageLength);
buf.writeInt((int) this.time);
buf.writeShort(this.version);
buf.writeShort(this.msgId.val);
this.encode2(buf);
return buf;
}
/**
@ -72,4 +68,7 @@ public abstract class MessageData {
protected void encode2(ByteBuf buf) {
}
public int total() {
return this.length + 2;
}
}

View File

@ -21,5 +21,9 @@ public class OccMessageManage implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
// 读取数据配置创建客户端
this.registerClient(new XianOccMessagingClient(3, "localhost"));
for (XianOccMessagingClient client : this.clientMap.values()) {
client.connection.connect();
}
}
}

View File

@ -52,9 +52,10 @@ public class TcpClientConnection {
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error("与OCC服务连接异常", e);
} finally {
log.info("关闭eventLoop");
group.shutdownGracefully();
}
}
// finally {
// log.info("关闭eventLoop");
// group.shutdownGracefully();
// }
}
}

View File

@ -1,10 +1,15 @@
package club.joylink.xiannccda.ats.message.line3;
import club.joylink.xiannccda.ats.message.MessageData;
import club.joylink.xiannccda.ats.message.MessageId;
import io.netty.buffer.ByteBuf;
public class HeartBeatMsg extends MessageData {
public HeartBeatMsg() {
super(MessageId.MESSAGE_POLLING, 0);
}
@Override
public void decode2(ByteBuf buf) throws Exception {

View File

@ -5,7 +5,6 @@ import club.joylink.xiannccda.entity.Drafting;
import club.joylink.xiannccda.entity.Drafting.Creation;
import club.joylink.xiannccda.repository.IDraftingRepository;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
@ -15,9 +14,9 @@ import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
@ -55,6 +54,15 @@ public class DraftingController {
return this.draftingRepository.create(drafting);
}
@PostMapping("/{id}/saveAs")
@SecurityRequirement(name = "jwt")
@Operation(summary = "创建绘图草稿")
@ApiResponse(description = "绘图草稿信息")
public Drafting create(@PathVariable Integer id,
@RequestBody @Validated(Creation.class) Drafting drafting) {
return this.draftingRepository.saveAs(id, drafting);
}
@GetMapping("/{id}")
@SecurityRequirement(name = "jwt")
@Operation(summary = "获取绘图草稿数据")
@ -63,6 +71,15 @@ public class DraftingController {
return this.draftingRepository.getById(id);
}
@PutMapping("/{id}")
@SecurityRequirement(name = "jwt")
@Operation(summary = "保存绘图草稿数据")
@ApiResponse(description = "保存成功失败标识")
public boolean updateDrawData(@PathVariable Integer id,
@RequestBody @Validated(Creation.class) Drafting drafting) {
return this.draftingRepository.updateDrawData(id, drafting.getProto());
}
@DeleteMapping("/{id}")
@SecurityRequirement(name = "jwt")
@Operation(summary = "删除绘图草稿数据")

View File

@ -3,6 +3,7 @@ package club.joylink.xiannccda.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import java.time.LocalDateTime;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
@ -28,10 +29,11 @@ public class Drafting {
private Integer id;
@Schema(description = "草稿图名称")
@NotBlank(message = "草稿图名称不能为空", groups = {Creation.class})
@NotBlank(message = "草稿图名称不能为空", groups = {Creation.class, SaveAs.class})
private String name;
@Schema(description = "绘图数据")
@NotNull(message = "数据不能为空", groups = {SaveData.class, SaveAs.class})
private byte[] proto;
@Schema(description = "创建时间")
@ -53,4 +55,12 @@ public class Drafting {
public interface Creation {
}
public interface SaveAs {
}
public interface SaveData {
}
}

View File

@ -24,4 +24,8 @@ public interface IDraftingRepository extends IService<Drafting> {
Page<Drafting> pageQuery(DraftingQueryDto query);
Drafting create(Drafting drafting);
boolean updateDrawData(Integer id, byte[] proto);
Drafting saveAs(Integer id, Drafting drafting);
}

View File

@ -2,6 +2,7 @@ package club.joylink.xiannccda.repository.impl;
import club.joylink.xiannccda.dto.DraftingQueryDto;
import club.joylink.xiannccda.entity.Drafting;
import club.joylink.xiannccda.exception.BusinessExceptionAssertEnum;
import club.joylink.xiannccda.mapper.DraftingMapper;
import club.joylink.xiannccda.repository.IDraftingRepository;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@ -28,10 +29,9 @@ public class DraftingRepository extends ServiceImpl<DraftingMapper, Drafting> im
@Override
public Page<Drafting> pageQuery(DraftingQueryDto query) {
log.info("分页查询参数: {},{},{}", query.getCurrent(), query.getSize(), query.getName());
LambdaQueryWrapper<Drafting> wrapper = Wrappers.lambdaQuery();
// 分页不查询具体proto数据
wrapper.select(info -> !info.getColumn().equals("proto"));
wrapper.select(Drafting.class, info -> !info.getColumn().equals("proto"));
if (StringUtils.hasText(query.getName())) {
wrapper.like(Drafting::getName, query.getName());
}
@ -45,4 +45,32 @@ public class DraftingRepository extends ServiceImpl<DraftingMapper, Drafting> im
this.save(drafting);
return drafting;
}
@Override
public boolean updateDrawData(Integer id, byte[] proto) {
Drafting drafting = new Drafting();
drafting.setId(id);
drafting.setProto(proto);
drafting.setUpdateAt(LocalDateTime.now());
return this.updateById(drafting);
}
@Override
public Drafting saveAs(Integer id, Drafting drafting) {
BusinessExceptionAssertEnum.UNIQUE_FIELD_REPEAT.assertNotTrue(
this.isNameExist(drafting.getName()),
String.format("草稿名称已存在: %s", drafting.getName()));
Drafting old = this.getById(id);
old.setId(null);
old.setName(drafting.getName());
old.setProto(drafting.getProto());
this.save(old);
old.setProto(null); // 另存为数据不回传
return old;
}
public boolean isNameExist(String name) {
return this.count(Wrappers.<Drafting>lambdaQuery().eq(Drafting::getName, name)) > 0;
}
}

View File

@ -0,0 +1,40 @@
package club.joylink.xiannccda.occmock;
import club.joylink.xiannccda.ats.message.FrameSchema;
import club.joylink.xiannccda.ats.message.line3.HeartBeatMsg;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class OccHandler extends ByteToMessageCodec<FrameSchema> {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("客户端连接: {}", ctx.channel().id());
ctx.executor().scheduleAtFixedRate(() -> {
log.info("定时发送心跳消息");
List<FrameSchema> frames = FrameSchema.buildFrom(new HeartBeatMsg());
for (FrameSchema frame : frames) {
ctx.channel().write(frame);
}
ctx.channel().flush();
}, 1, 10, TimeUnit.SECONDS);
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, FrameSchema frameSchema,
ByteBuf byteBuf) throws Exception {
frameSchema.encode(byteBuf);
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf,
List<Object> list) throws Exception {
}
}

View File

@ -1,5 +1,65 @@
package club.joylink.xiannccda.occmock;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class OccServer {
public static void main(String[] args) {
final int PORT = 2603;
// Configure the server.
//创建两个EventLoopGroup对象
EventLoopGroup bossGroup = new NioEventLoopGroup(1);//创建boos线程组用于服务端接受客户端的连接
EventLoopGroup workerGroup = new NioEventLoopGroup();//创建worker线程组用于进行SocketChannel的数据读写处理业务逻辑
//创建Handler
final OccHandler serverHandler = new OccHandler();
try {
//创建ServerBootstrap对象
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//设置EventLoopGroup
.channel(NioServerSocketChannel.class)//设置要被实例化的NioServerSocketChannel类
// .option(ChannelOption.SO_BACKLOG, 100) //设置NioServerSocketChannel的可设置项
.handler(new LoggingHandler(LogLevel.INFO))//设置NioServerSocketChannel的处理器
.childHandler(new ChannelInitializer<SocketChannel>() {//设置处理连入的Client的SocketChannel的处理器
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(serverHandler);
}
});
// Start the server.
//绑定端口并同步等待成功即启动服务端
ChannelFuture f = b.bind(PORT).addListener(listener -> {
if (listener.isSuccess()) {
System.out.println("OCC测试服务启动...");
}
}).sync();
// Wait until the server socket is closed.
//监听服务端关闭并阻塞等待
//这里并不是关闭服务器而是监听服务端关闭
f.channel().closeFuture().addListener(listener -> {
System.out.println("OCC测试服务关闭");
}).sync();
System.out.println("all after");
} catch (Exception e) {
System.err.println("OCC测试服务异常");
e.printStackTrace();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}