添加心跳监测断线重连逻辑
This commit is contained in:
parent
0333c5d220
commit
6027607c21
@ -3,6 +3,7 @@ package club.joylink.xiannccda.ats.message;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.Setter;
|
||||
|
||||
@ -28,7 +29,7 @@ public abstract class MessageData {
|
||||
|
||||
public MessageData(MessageId msgId, int contentLength) {
|
||||
this.length = 8 + contentLength;
|
||||
this.time = System.currentTimeMillis();
|
||||
this.time = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
|
||||
this.version = 0x01;
|
||||
this.msgId = msgId;
|
||||
}
|
||||
|
@ -9,18 +9,19 @@ import java.util.List;
|
||||
|
||||
public class OccMessageDecoder extends ByteToMessageDecoder {
|
||||
|
||||
final XianOccMessagingClient client;
|
||||
final TcpClientConnection connection;
|
||||
|
||||
public OccMessageDecoder(XianOccMessagingClient client) {
|
||||
this.client = client;
|
||||
public OccMessageDecoder(TcpClientConnection connection) {
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||
connection.lastReceiveMessageTime = System.currentTimeMillis();
|
||||
List<MessageData> messages = FrameSchema.decode(in);
|
||||
if (!(messages == null || messages.size() == 0)) {
|
||||
System.out.println(
|
||||
String.format("收到消息: %s",
|
||||
String.format("收到OCC消息: %s",
|
||||
JSON.toJSONString(messages.get(0), Feature.PrettyFormat, Feature.FieldBased,
|
||||
Feature.WriteNulls)));
|
||||
out.add(messages);
|
||||
|
@ -9,10 +9,10 @@ import lombok.extern.slf4j.Slf4j;
|
||||
@Slf4j
|
||||
public class OccMessageEncoder extends MessageToByteEncoder<List<MessageData>> {
|
||||
|
||||
final XianOccMessagingClient client;
|
||||
final TcpClientConnection connection;
|
||||
|
||||
public OccMessageEncoder(XianOccMessagingClient client) {
|
||||
this.client = client;
|
||||
public OccMessageEncoder(TcpClientConnection connection) {
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -7,21 +7,34 @@ import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class TcpClientConnection {
|
||||
|
||||
boolean autoReconnect = true;
|
||||
|
||||
XianOccMessagingClient client;
|
||||
final String host;
|
||||
final int port;
|
||||
|
||||
volatile boolean connected;
|
||||
|
||||
final EventLoopGroup group;
|
||||
final Bootstrap bootstrap;
|
||||
volatile boolean connected;
|
||||
/**
|
||||
* 连接管道
|
||||
*/
|
||||
Channel channel;
|
||||
HeartBeatTimeoutHandler timeoutHandler;
|
||||
|
||||
/**
|
||||
* 最后一次收到消息时间
|
||||
*/
|
||||
volatile long lastReceiveMessageTime;
|
||||
|
||||
public TcpClientConnection(XianOccMessagingClient client, String host, int port) {
|
||||
this.client = client;
|
||||
@ -29,34 +42,91 @@ public class TcpClientConnection {
|
||||
this.port = port;
|
||||
this.group = new NioEventLoopGroup();
|
||||
this.bootstrap = new Bootstrap();
|
||||
TcpClientConnection self = this;
|
||||
bootstrap.group(group)
|
||||
.channel(NioSocketChannel.class)
|
||||
.handler(new ChannelInitializer<>() {
|
||||
@Override
|
||||
protected void initChannel(Channel ch) {
|
||||
ch.pipeline().addLast(new OccMessageEncoder(client));
|
||||
ch.pipeline().addLast(new OccMessageDecoder(client));
|
||||
ch.pipeline().addLast(new OccMessageEncoder(self));
|
||||
ch.pipeline().addLast(new OccMessageDecoder(self));
|
||||
}
|
||||
});
|
||||
|
||||
this.timeoutHandler = new HeartBeatTimeoutHandler(this);
|
||||
}
|
||||
|
||||
public void connect() {
|
||||
void reconnect() {
|
||||
if (this.channel != null) {
|
||||
this.channel.close();
|
||||
} else {
|
||||
this.connect();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void connect() {
|
||||
try {
|
||||
ChannelFuture channelFuture = bootstrap.connect(this.host, this.port).addListener(future1 -> {
|
||||
ChannelFuture channelFuture = bootstrap.connect(this.host, this.port);
|
||||
channelFuture.addListener(future1 -> {
|
||||
if (future1.isSuccess()) {
|
||||
log.info("连接到OCC服务: host={}, port={}", this.host, this.port);
|
||||
} else {
|
||||
log.info("连接OCC失败,尝试重连");
|
||||
connect();
|
||||
this.connected = true;
|
||||
this.channel = channelFuture.channel();
|
||||
// 启动心跳超时监测
|
||||
this.timeoutHandler.start();
|
||||
}
|
||||
});
|
||||
channelFuture.channel().closeFuture().sync();
|
||||
channelFuture.channel().closeFuture().addListener(listener -> {
|
||||
log.info("与服务断连,尝试重连");
|
||||
this.timeoutHandler.stop();
|
||||
this.connected = false;
|
||||
this.channel = null;
|
||||
|
||||
Thread.sleep(3000);
|
||||
connect();
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("与OCC服务连接异常", e);
|
||||
log.error("与OCC服务连接异常,尝试重连", e);
|
||||
connect();
|
||||
}
|
||||
// finally {
|
||||
// log.info("关闭eventLoop");
|
||||
// group.shutdownGracefully();
|
||||
// }
|
||||
}
|
||||
|
||||
static class HeartBeatTimeoutHandler {
|
||||
|
||||
/**
|
||||
* 心跳消息超时
|
||||
*/
|
||||
final int HeartBeatTimeout = 15 * 1000;
|
||||
static final int Period = 2;
|
||||
|
||||
TcpClientConnection connection;
|
||||
boolean running;
|
||||
static final ScheduledExecutorService Executor = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
public HeartBeatTimeoutHandler(TcpClientConnection connection) {
|
||||
this.connection = connection;
|
||||
Executor.scheduleAtFixedRate(() -> {
|
||||
if (running) {
|
||||
long ctm = System.currentTimeMillis();
|
||||
if (connection.lastReceiveMessageTime + HeartBeatTimeout < ctm) {
|
||||
log.info("超时未收到OCC消息,尝试重连: host={}, port={}", connection.host,
|
||||
connection.port);
|
||||
connection.reconnect();
|
||||
}
|
||||
}
|
||||
}, Period, Period, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
void start() {
|
||||
// log.info("心跳超时监控启动");
|
||||
this.running = true;
|
||||
connection.lastReceiveMessageTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
void stop() {
|
||||
// log.info("心跳超时监控stop");
|
||||
this.running = false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -36,6 +36,7 @@ public class XianOccMessagingClient {
|
||||
|
||||
final TcpClientConnection connection;
|
||||
|
||||
|
||||
public XianOccMessagingClient(int lineId, String host) {
|
||||
this.host = host;
|
||||
this.lineId = lineId;
|
||||
|
@ -41,7 +41,7 @@ public class OccServer {
|
||||
if (listener.isSuccess()) {
|
||||
System.out.println("OCC测试服务启动...");
|
||||
}
|
||||
}).sync();
|
||||
});
|
||||
|
||||
// Wait until the server socket is closed.
|
||||
//监听服务端关闭,并阻塞等待
|
||||
@ -49,7 +49,6 @@ public class OccServer {
|
||||
f.channel().closeFuture().addListener(listener -> {
|
||||
System.out.println("OCC测试服务关闭");
|
||||
}).sync();
|
||||
System.out.println("all after");
|
||||
} catch (Exception e) {
|
||||
System.err.println("OCC测试服务异常");
|
||||
e.printStackTrace();
|
||||
|
Loading…
Reference in New Issue
Block a user