From 690499ecfa4b3485e8271bf6a1a5bc4dc47fe283 Mon Sep 17 00:00:00 2001 From: tiger_zhou Date: Thu, 30 Nov 2023 17:57:59 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E5=BF=83=E8=B7=B3=E8=B6=85?= =?UTF-8?q?=E6=97=B6=E9=87=8D=E6=96=B0=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 20 +++++- .../ats/message/OccMessageHandler.java | 2 +- .../ats/message/OccTcpClientConnection.java | 67 ++----------------- .../xiannccda/ats/message/ReconnectState.java | 2 +- .../ats/message/XianOccMessagingClient.java | 43 ++++++++++-- 5 files changed, 65 insertions(+), 69 deletions(-) diff --git a/pom.xml b/pom.xml index 6c26b94..0f97ec8 100644 --- a/pom.xml +++ b/pom.xml @@ -63,14 +63,30 @@ springdoc-openapi-starter-webmvc-ui 2.0.2 - + + com.baomidou mybatis-plus-boot-starter ${mybatis-plus-version} + + + + com.zaxxer + HikariCP + + + + + com.zaxxer + HikariCP + 5.1.0 - com.baomidou diff --git a/src/main/java/club/joylink/xiannccda/ats/message/OccMessageHandler.java b/src/main/java/club/joylink/xiannccda/ats/message/OccMessageHandler.java index a0b326f..8555a49 100644 --- a/src/main/java/club/joylink/xiannccda/ats/message/OccMessageHandler.java +++ b/src/main/java/club/joylink/xiannccda/ats/message/OccMessageHandler.java @@ -31,7 +31,7 @@ public class OccMessageHandler extends SimpleChannelInboundHandler() { @Override @@ -87,7 +72,7 @@ public class OccTcpClientConnection { ch.pipeline().addLast(new OccMessageHandler(self, collectorData)); } }); - this.timeoutHandler = new HeartBeatTimeoutHandler(this); +// this.timeoutHandler = new HeartBeatTimeoutHandler(this); this.reconnectState = new ReconnectState(); } @@ -117,7 +102,6 @@ public class OccTcpClientConnection { this.connected = true; this.client.requestBaseData(); } - this.reconnectState.resetState(); }); channelFuture.channel().closeFuture().addListener(listener -> { if (listener.isSuccess()) { @@ -125,9 +109,8 @@ public class OccTcpClientConnection { this.connected = false; this.channel = null; this.client.resetRequestBaseDataFlag(); - Thread.sleep(3000); - connect(); } +// this.reconnectState.resetState(); }); } @@ -136,42 +119,4 @@ public class OccTcpClientConnection { } - static class HeartBeatTimeoutHandler { - - /** - * 心跳消息超时 - */ - final int HeartBeatTimeout = 20 * 1000; - static final int Period = 2; - - OccTcpClientConnection connection; - static final ScheduledExecutorService Executor = Executors.newSingleThreadScheduledExecutor(); - - public HeartBeatTimeoutHandler(OccTcpClientConnection connection) { - this.connection = connection; - - Executor.scheduleWithFixedDelay(() -> { - if (this.connection.connected) { - long ctm = System.currentTimeMillis(); - if (connection.lastReceiveMessageTime + HeartBeatTimeout < ctm) { - log.info("超时未收到OCC消息,尝试断开重连 port :{} 最后一次获取数据时间:{}", this.connection.port, connection.lastReceiveMessageTime); - connection.reconnect(); - } - } - }, Period, Period, TimeUnit.SECONDS); - /*Executor.scheduleAtFixedRate(() -> { - if (connection.connected) { - long ctm = System.currentTimeMillis(); - if (connection.lastReceiveMessageTime + HeartBeatTimeout < ctm) { - log.info("超时未收到OCC消息,尝试断开重连 port :{} 最后一次获取数据时间:{}", this.connection.port, connection.lastReceiveMessageTime); - connection.reconnect(); - } - } else { - connection.reconnect(); - } - }, Period, Period, TimeUnit.SECONDS);*/ - } - } - - } diff --git a/src/main/java/club/joylink/xiannccda/ats/message/ReconnectState.java b/src/main/java/club/joylink/xiannccda/ats/message/ReconnectState.java index febaa92..8325eaa 100644 --- a/src/main/java/club/joylink/xiannccda/ats/message/ReconnectState.java +++ b/src/main/java/club/joylink/xiannccda/ats/message/ReconnectState.java @@ -7,7 +7,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class ReconnectState { - private final static int RECONN_TIME_OUT = 5 * 1000; + private final static int RECONN_TIME_OUT = 3 * 1000; private AtomicBoolean isReconn = new AtomicBoolean(false); @Setter private Long reconnTime; diff --git a/src/main/java/club/joylink/xiannccda/ats/message/XianOccMessagingClient.java b/src/main/java/club/joylink/xiannccda/ats/message/XianOccMessagingClient.java index e56202a..68e21fc 100644 --- a/src/main/java/club/joylink/xiannccda/ats/message/XianOccMessagingClient.java +++ b/src/main/java/club/joylink/xiannccda/ats/message/XianOccMessagingClient.java @@ -4,7 +4,10 @@ import club.joylink.xiannccda.ats.message.line3.req.LoadHistoryTGDataRequest; import club.joylink.xiannccda.ats.message.line3.req.LoadHistoryTGDataRequest.ApplyTypeEnum; import java.sql.Time; import java.time.LocalDateTime; +import java.util.LinkedList; import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import lombok.Getter; @@ -37,6 +40,8 @@ public class XianOccMessagingClient { private final Long requestBaseTime; + private final ConnectionTimeOutHandler timeOutHandler = new ConnectionTimeOutHandler(); + public void send(MessageData md, boolean isRealTime) { OccTcpClientConnection conn = isRealTime ? rtConnection : nrtConnection; if (conn.connected) { @@ -55,6 +60,9 @@ public class XianOccMessagingClient { // 创建实时和非实时消息连接 this.rtConnection = new OccTcpClientConnection(this, host, realPort, collectorData); this.nrtConnection = new OccTcpClientConnection(this, host, unRealPort, false); + this.timeOutHandler.addConnection(this.rtConnection); + this.timeOutHandler.addConnection(this.nrtConnection); + } @@ -85,10 +93,7 @@ public class XianOccMessagingClient { * 连接OCC服务 */ public void connect() { - // 实时消息连接 - this.rtConnection.connect(); - // 非实时消息连接 - this.nrtConnection.connect(); + this.timeOutHandler.start(); } @Getter @@ -102,4 +107,34 @@ public class XianOccMessagingClient { private Boolean connectioned; private Long lastReceiveMessageTime; } + + public static class ConnectionTimeOutHandler { + + final int HeartBeatTimeout = 20 * 1000; + static final int Period = 2; + + static final ScheduledExecutorService Executor = Executors.newSingleThreadScheduledExecutor(); + private final LinkedList clientConnections = new LinkedList<>(); + + public void addConnection(OccTcpClientConnection conn) { + this.clientConnections.add(conn); + } + + public void start() { + Executor.scheduleWithFixedDelay(() -> { + for (OccTcpClientConnection cc : this.clientConnections) { + if (cc.connected) { + long ctm = System.currentTimeMillis(); + if (cc.lastReceiveMessageTime + HeartBeatTimeout < ctm) { + log.info("超时未收到OCC消息,尝试断开重连 port :{} 最后一次获取数据时间:{}", cc.port, cc.lastReceiveMessageTime); + cc.reconnect(); + } + } else { + cc.reconnect(); + } + } + }, Period, Period, TimeUnit.SECONDS); + } + + } }