调整心跳超时重新连接

This commit is contained in:
tiger_zhou 2023-11-30 17:57:59 +08:00
parent a1bcfc156a
commit 690499ecfa
5 changed files with 65 additions and 69 deletions

20
pom.xml
View File

@ -63,14 +63,30 @@
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.0.2</version>
</dependency>
<!--设备状态采集 cpu,内存,硬盘等-->
<!--<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>
<version>6.4.8</version>
</dependency>-->
<!-- mybatis-plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus-version}</version>
<!--排除5.0.1的数据库连接池,下面引用最新的-->
<exclusions>
<exclusion>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>5.1.0</version>
</dependency>
<!-- mybatis-plus 代码生成 -->
<dependency>
<groupId>com.baomidou</groupId>

View File

@ -31,7 +31,7 @@ public class OccMessageHandler extends SimpleChannelInboundHandler<List<MessageD
this.connection = connection;
this.collectorData = collectorData;
log.info(" port[{}] ,收集消息数据[{}]", this.connection.port, this.collectorData);
// log.info(" port[{}] ,收集消息数据[{}]", this.connection.port, this.collectorData);
}
static {

View File

@ -1,31 +1,19 @@
package club.joylink.xiannccda.ats.message;
import club.joylink.xiannccda.ats.message.collect.DeviceDataRepository;
import club.joylink.xiannccda.ats.message.collect.DeviceDataRepository.DataTypeEnum;
import club.joylink.xiannccda.ats.message.collect.datasource.InUsedScheduleData;
import club.joylink.xiannccda.ats.message.line3.req.LoadHistoryTGDataRequest;
import club.joylink.xiannccda.ats.message.line3.req.LoadHistoryTGDataRequest.ApplyTypeEnum;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import oshi.SystemInfo;
@Slf4j
public class OccTcpClientConnection {
@ -47,7 +35,7 @@ public class OccTcpClientConnection {
/**
* 心跳超时重连处理器
*/
HeartBeatTimeoutHandler timeoutHandler;
// HeartBeatTimeoutHandler timeoutHandler;
/**
* 最后一次收到消息时间,重新连接的时候也会更新此参数
*/
@ -61,11 +49,9 @@ public class OccTcpClientConnection {
final ReconnectState reconnectState;
public void write(MessageData messageData) {
/*if (Objects.nonNull(this.channel)) {
this.channel.close();
this.channel = null;
}*/
this.channel.writeAndFlush(List.of(messageData));
if (Objects.nonNull(this.channel)) {
this.channel.writeAndFlush(List.of(messageData));
}
}
public OccTcpClientConnection(XianOccMessagingClient client, String host, int port, boolean collectorData) {
@ -77,7 +63,6 @@ public class OccTcpClientConnection {
this.bootstrap = new Bootstrap();
OccTcpClientConnection self = this;
bootstrap.group(group)
// .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<>() {
@Override
@ -87,7 +72,7 @@ public class OccTcpClientConnection {
ch.pipeline().addLast(new OccMessageHandler(self, collectorData));
}
});
this.timeoutHandler = new HeartBeatTimeoutHandler(this);
// this.timeoutHandler = new HeartBeatTimeoutHandler(this);
this.reconnectState = new ReconnectState();
}
@ -117,7 +102,6 @@ public class OccTcpClientConnection {
this.connected = true;
this.client.requestBaseData();
}
this.reconnectState.resetState();
});
channelFuture.channel().closeFuture().addListener(listener -> {
if (listener.isSuccess()) {
@ -125,9 +109,8 @@ public class OccTcpClientConnection {
this.connected = false;
this.channel = null;
this.client.resetRequestBaseDataFlag();
Thread.sleep(3000);
connect();
}
// this.reconnectState.resetState();
});
}
@ -136,42 +119,4 @@ public class OccTcpClientConnection {
}
static class HeartBeatTimeoutHandler {
/**
* 心跳消息超时
*/
final int HeartBeatTimeout = 20 * 1000;
static final int Period = 2;
OccTcpClientConnection connection;
static final ScheduledExecutorService Executor = Executors.newSingleThreadScheduledExecutor();
public HeartBeatTimeoutHandler(OccTcpClientConnection connection) {
this.connection = connection;
Executor.scheduleWithFixedDelay(() -> {
if (this.connection.connected) {
long ctm = System.currentTimeMillis();
if (connection.lastReceiveMessageTime + HeartBeatTimeout < ctm) {
log.info("超时未收到OCC消息尝试断开重连 port :{} 最后一次获取数据时间:{}", this.connection.port, connection.lastReceiveMessageTime);
connection.reconnect();
}
}
}, Period, Period, TimeUnit.SECONDS);
/*Executor.scheduleAtFixedRate(() -> {
if (connection.connected) {
long ctm = System.currentTimeMillis();
if (connection.lastReceiveMessageTime + HeartBeatTimeout < ctm) {
log.info("超时未收到OCC消息尝试断开重连 port :{} 最后一次获取数据时间:{}", this.connection.port, connection.lastReceiveMessageTime);
connection.reconnect();
}
} else {
connection.reconnect();
}
}, Period, Period, TimeUnit.SECONDS);*/
}
}
}

View File

@ -7,7 +7,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ReconnectState {
private final static int RECONN_TIME_OUT = 5 * 1000;
private final static int RECONN_TIME_OUT = 3 * 1000;
private AtomicBoolean isReconn = new AtomicBoolean(false);
@Setter
private Long reconnTime;

View File

@ -4,7 +4,10 @@ import club.joylink.xiannccda.ats.message.line3.req.LoadHistoryTGDataRequest;
import club.joylink.xiannccda.ats.message.line3.req.LoadHistoryTGDataRequest.ApplyTypeEnum;
import java.sql.Time;
import java.time.LocalDateTime;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
@ -37,6 +40,8 @@ public class XianOccMessagingClient {
private final Long requestBaseTime;
private final ConnectionTimeOutHandler timeOutHandler = new ConnectionTimeOutHandler();
public void send(MessageData md, boolean isRealTime) {
OccTcpClientConnection conn = isRealTime ? rtConnection : nrtConnection;
if (conn.connected) {
@ -55,6 +60,9 @@ public class XianOccMessagingClient {
// 创建实时和非实时消息连接
this.rtConnection = new OccTcpClientConnection(this, host, realPort, collectorData);
this.nrtConnection = new OccTcpClientConnection(this, host, unRealPort, false);
this.timeOutHandler.addConnection(this.rtConnection);
this.timeOutHandler.addConnection(this.nrtConnection);
}
@ -85,10 +93,7 @@ public class XianOccMessagingClient {
* 连接OCC服务
*/
public void connect() {
// 实时消息连接
this.rtConnection.connect();
// 非实时消息连接
this.nrtConnection.connect();
this.timeOutHandler.start();
}
@Getter
@ -102,4 +107,34 @@ public class XianOccMessagingClient {
private Boolean connectioned;
private Long lastReceiveMessageTime;
}
public static class ConnectionTimeOutHandler {
final int HeartBeatTimeout = 20 * 1000;
static final int Period = 2;
static final ScheduledExecutorService Executor = Executors.newSingleThreadScheduledExecutor();
private final LinkedList<OccTcpClientConnection> clientConnections = new LinkedList<>();
public void addConnection(OccTcpClientConnection conn) {
this.clientConnections.add(conn);
}
public void start() {
Executor.scheduleWithFixedDelay(() -> {
for (OccTcpClientConnection cc : this.clientConnections) {
if (cc.connected) {
long ctm = System.currentTimeMillis();
if (cc.lastReceiveMessageTime + HeartBeatTimeout < ctm) {
log.info("超时未收到OCC消息尝试断开重连 port :{} 最后一次获取数据时间:{}", cc.port, cc.lastReceiveMessageTime);
cc.reconnect();
}
} else {
cc.reconnect();
}
}
}, Period, Period, TimeUnit.SECONDS);
}
}
}