This commit is contained in:
xzb 2023-06-07 14:32:18 +08:00
commit 9117df92bf
6 changed files with 97 additions and 25 deletions

View File

@ -3,6 +3,7 @@ package club.joylink.xiannccda.ats.message;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.Setter; import lombok.Setter;
@ -28,7 +29,7 @@ public abstract class MessageData {
public MessageData(MessageId msgId, int contentLength) { public MessageData(MessageId msgId, int contentLength) {
this.length = 8 + contentLength; this.length = 8 + contentLength;
this.time = System.currentTimeMillis(); this.time = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
this.version = 0x01; this.version = 0x01;
this.msgId = msgId; this.msgId = msgId;
} }

View File

@ -9,18 +9,19 @@ import java.util.List;
public class OccMessageDecoder extends ByteToMessageDecoder { public class OccMessageDecoder extends ByteToMessageDecoder {
final XianOccMessagingClient client; final TcpClientConnection connection;
public OccMessageDecoder(XianOccMessagingClient client) { public OccMessageDecoder(TcpClientConnection connection) {
this.client = client; this.connection = connection;
} }
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
connection.lastReceiveMessageTime = System.currentTimeMillis();
List<MessageData> messages = FrameSchema.decode(in); List<MessageData> messages = FrameSchema.decode(in);
if (!(messages == null || messages.size() == 0)) { if (!(messages == null || messages.size() == 0)) {
System.out.println( System.out.println(
String.format("收到消息: %s", String.format("收到OCC消息: %s",
JSON.toJSONString(messages.get(0), Feature.PrettyFormat, Feature.FieldBased, JSON.toJSONString(messages.get(0), Feature.PrettyFormat, Feature.FieldBased,
Feature.WriteNulls))); Feature.WriteNulls)));
out.add(messages); out.add(messages);

View File

@ -9,10 +9,10 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class OccMessageEncoder extends MessageToByteEncoder<List<MessageData>> { public class OccMessageEncoder extends MessageToByteEncoder<List<MessageData>> {
final XianOccMessagingClient client; final TcpClientConnection connection;
public OccMessageEncoder(XianOccMessagingClient client) { public OccMessageEncoder(TcpClientConnection connection) {
this.client = client; this.connection = connection;
} }
@Override @Override

View File

@ -7,21 +7,34 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class TcpClientConnection { public class TcpClientConnection {
boolean autoReconnect = true;
XianOccMessagingClient client; XianOccMessagingClient client;
final String host; final String host;
final int port; final int port;
volatile boolean connected;
final EventLoopGroup group; final EventLoopGroup group;
final Bootstrap bootstrap; final Bootstrap bootstrap;
volatile boolean connected;
/**
* 连接管道
*/
Channel channel;
HeartBeatTimeoutHandler timeoutHandler;
/**
* 最后一次收到消息时间
*/
volatile long lastReceiveMessageTime;
public TcpClientConnection(XianOccMessagingClient client, String host, int port) { public TcpClientConnection(XianOccMessagingClient client, String host, int port) {
this.client = client; this.client = client;
@ -29,34 +42,91 @@ public class TcpClientConnection {
this.port = port; this.port = port;
this.group = new NioEventLoopGroup(); this.group = new NioEventLoopGroup();
this.bootstrap = new Bootstrap(); this.bootstrap = new Bootstrap();
TcpClientConnection self = this;
bootstrap.group(group) bootstrap.group(group)
.channel(NioSocketChannel.class) .channel(NioSocketChannel.class)
.handler(new ChannelInitializer<>() { .handler(new ChannelInitializer<>() {
@Override @Override
protected void initChannel(Channel ch) { protected void initChannel(Channel ch) {
ch.pipeline().addLast(new OccMessageEncoder(client)); ch.pipeline().addLast(new OccMessageEncoder(self));
ch.pipeline().addLast(new OccMessageDecoder(client)); ch.pipeline().addLast(new OccMessageDecoder(self));
} }
}); });
this.timeoutHandler = new HeartBeatTimeoutHandler(this);
} }
public void connect() { void reconnect() {
if (this.channel != null) {
this.channel.close();
} else {
this.connect();
}
}
public synchronized void connect() {
try { try {
ChannelFuture channelFuture = bootstrap.connect(this.host, this.port).addListener(future1 -> { ChannelFuture channelFuture = bootstrap.connect(this.host, this.port);
channelFuture.addListener(future1 -> {
if (future1.isSuccess()) { if (future1.isSuccess()) {
log.info("连接到OCC服务: host={}, port={}", this.host, this.port); log.info("连接到OCC服务: host={}, port={}", this.host, this.port);
} else { this.connected = true;
log.info("连接OCC失败尝试重连"); this.channel = channelFuture.channel();
connect(); // 启动心跳超时监测
this.timeoutHandler.start();
} }
}); });
channelFuture.channel().closeFuture().sync(); channelFuture.channel().closeFuture().addListener(listener -> {
log.info("与服务断连,尝试重连");
this.timeoutHandler.stop();
this.connected = false;
this.channel = null;
Thread.sleep(3000);
connect();
});
} catch (Exception e) { } catch (Exception e) {
log.error("与OCC服务连接异常", e); log.error("与OCC服务连接异常尝试重连", e);
connect();
} }
// finally {
// log.info("关闭eventLoop");
// group.shutdownGracefully();
// }
} }
static class HeartBeatTimeoutHandler {
/**
* 心跳消息超时
*/
final int HeartBeatTimeout = 15 * 1000;
static final int Period = 2;
TcpClientConnection connection;
boolean running;
static final ScheduledExecutorService Executor = Executors.newSingleThreadScheduledExecutor();
public HeartBeatTimeoutHandler(TcpClientConnection connection) {
this.connection = connection;
Executor.scheduleAtFixedRate(() -> {
if (running) {
long ctm = System.currentTimeMillis();
if (connection.lastReceiveMessageTime + HeartBeatTimeout < ctm) {
log.info("超时未收到OCC消息尝试重连: host={}, port={}", connection.host,
connection.port);
connection.reconnect();
}
}
}, Period, Period, TimeUnit.SECONDS);
}
void start() {
// log.info("心跳超时监控启动");
this.running = true;
connection.lastReceiveMessageTime = System.currentTimeMillis();
}
void stop() {
// log.info("心跳超时监控stop");
this.running = false;
}
}
} }

View File

@ -36,6 +36,7 @@ public class XianOccMessagingClient {
final TcpClientConnection connection; final TcpClientConnection connection;
public XianOccMessagingClient(int lineId, String host) { public XianOccMessagingClient(int lineId, String host) {
this.host = host; this.host = host;
this.lineId = lineId; this.lineId = lineId;

View File

@ -41,7 +41,7 @@ public class OccServer {
if (listener.isSuccess()) { if (listener.isSuccess()) {
System.out.println("OCC测试服务启动..."); System.out.println("OCC测试服务启动...");
} }
}).sync(); });
// Wait until the server socket is closed. // Wait until the server socket is closed.
//监听服务端关闭并阻塞等待 //监听服务端关闭并阻塞等待
@ -49,7 +49,6 @@ public class OccServer {
f.channel().closeFuture().addListener(listener -> { f.channel().closeFuture().addListener(listener -> {
System.out.println("OCC测试服务关闭"); System.out.println("OCC测试服务关闭");
}).sync(); }).sync();
System.out.println("all after");
} catch (Exception e) { } catch (Exception e) {
System.err.println("OCC测试服务异常"); System.err.println("OCC测试服务异常");
e.printStackTrace(); e.printStackTrace();