OCC消息客户端对象抽象初版

This commit is contained in:
walker 2023-06-05 18:14:57 +08:00
parent ecf561d015
commit ea4cfdd042
9 changed files with 263 additions and 1 deletions

View File

@ -7,7 +7,6 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.0</version>
<!--<version>2.3.5.RELEASE</version>-->
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>club.joylink</groupId>
@ -43,6 +42,11 @@
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>

View File

@ -0,0 +1,34 @@
package club.joylink.xiannccda.ats.message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FrameHandler extends ByteToMessageCodec<FrameSchema> {
final XianOccMessagingClient client;
public FrameHandler(XianOccMessagingClient client) {
this.client = client;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, FrameSchema frameSchema,
ByteBuf byteBuf) throws Exception {
log.info("发送消息到OCC");
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf,
List<Object> list) throws Exception {
log.info("收到OCC消息");
}
}

View File

@ -0,0 +1,27 @@
package club.joylink.xiannccda.ats.message;
import io.netty.buffer.ByteBuf;
import java.util.List;
public class FrameSchema {
/**
* (System ID)占1字节系统中每台计算机均有唯一的System ID, 此处表示发送方的System ID为0xff
*/
short systemId;
/**
* (Total Length) 占2字节表示每个信息帧中消息的总长度+1字节Multi-flag 最长为1025字节如果数据长度大于1024字节剩余的在紧接着的几帧数据中发送
*/
int totalLength;
/**
* (Multi-flag)为帧标识位占1字节值为0或者1分别表示下面的意思 1表示在这帧数据中一个完整的消息没有发送结束有后续帧
* 0表示在这帧数据中消息完整发送没有后续帧同样几个消息可合并成一帧数据发送
*/
short multiFlag;
/**
* 消息内容最长为1024字节它可由一个或多个消息组成也可以是一个消息的一部分每个消息由消息标识和消息数据组成
*/
ByteBuf packageData;
List<MessageData> messages;
}

View File

@ -0,0 +1,18 @@
package club.joylink.xiannccda.ats.message;
public class MessageData {
/**
* 消息的长度: 时间戳Time长度+ 版本号Version长度+消息IDmessage _id长度2字节+ 消息内容content长度
*/
int length;
/**
* 4字节时间戳为UTC时间
*/
long time;
/**
* 用来区分协议的版本本版指定为01H
*/
int version;
MessageId msgId;
}

View File

@ -0,0 +1,25 @@
package club.joylink.xiannccda.ats.message;
public enum MessageId {
UNKNOWN(0x0000),
/**
* 心跳
*/
MESSAGE_POLLING(0x0001),
;
int val;
MessageId(int val) {
this.val = val;
}
public static MessageId of(int val) {
for (MessageId messageId : MessageId.values()) {
if (messageId.val == val) {
return messageId;
}
}
return MessageId.UNKNOWN;
}
}

View File

@ -0,0 +1,25 @@
package club.joylink.xiannccda.ats.message;
import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
/**
* OCC客户端管理
*/
@Component
public class OccMessageManage implements ApplicationRunner {
Map<Integer, XianOccMessagingClient> clientMap = new HashMap<>();
public void registerClient(XianOccMessagingClient client) {
clientMap.put(client.lineId, client);
}
@Override
public void run(ApplicationArguments args) throws Exception {
// 读取数据配置创建客户端
}
}

View File

@ -0,0 +1,60 @@
package club.joylink.xiannccda.ats.message;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TcpClientConnection {
boolean autoReconnect = true;
XianOccMessagingClient client;
final String host;
final int port;
volatile boolean connected;
final EventLoopGroup group;
final Bootstrap bootstrap;
public TcpClientConnection(XianOccMessagingClient client, String host, int port) {
this.client = client;
this.host = host;
this.port = port;
this.group = new NioEventLoopGroup();
this.bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new FrameHandler(client));
}
});
}
public void connect() {
try {
ChannelFuture channelFuture = bootstrap.connect(this.host, this.port).addListener(future1 -> {
if (future1.isSuccess()) {
log.info("连接到OCC服务: host={}, port={}", this.host, this.port);
} else {
log.info("连接OCC失败尝试重连");
connect();
}
});
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error("与OCC服务连接异常", e);
} finally {
log.info("关闭eventLoop");
group.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,62 @@
package club.joylink.xiannccda.ats.message;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class XianOccMessagingClient {
final int realTimePortBase = 2600;
final int nonRealTimePortBase = 2700;
/**
* 线路号 实时信息的侦听端口号为2600+line_id非实时信息的侦听端口号为2700+line_id
* (如对于地铁1号线来说实时信息的侦听端口号为2601非实时信息的侦听端口号为2701)
*/
final int lineId;
final String host;
/**
* 实时端口号
*/
final int realTimePort;
/**
* 非实时端口号
*/
final int nonRealTimePort;
/**
* 版本号
*/
final int Version = 0x01;
/**
* 心跳发送间隔
*/
final int heartbeatInterval = 10;
/**
* 心跳超时
*/
final int heartbeatTimeout = 15;
final TcpClientConnection connection;
public XianOccMessagingClient(int lineId, String host) {
this.host = host;
this.lineId = lineId;
this.realTimePort = realTimePortBase + lineId;
this.nonRealTimePort = nonRealTimePortBase + lineId;
// 创建实时消息连接
this.connection = new TcpClientConnection(this, host, this.realTimePort);
}
/**
* 连接实时消息服务
*/
public void rtMessageConnect() {
}
/**
* 连接非实时消息服务
*/
public void nrtMessageConnect() {
}
}

View File

@ -0,0 +1,7 @@
package club.joylink.xiannccda.ats.message.line3;
import club.joylink.xiannccda.ats.message.MessageData;
public class HeartBeatMsg extends MessageData {
}