From 9abb18e3b6e6333549314597a7678abc52c2bd0f Mon Sep 17 00:00:00 2001 From: walker Date: Wed, 24 Jan 2024 13:25:00 +0800 Subject: [PATCH] =?UTF-8?q?udp=E6=9C=8D=E5=8A=A1=E5=81=9C=E6=AD=A2?= =?UTF-8?q?=E9=97=AE=E9=A2=98=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bj-rtss-message | 2 +- rtss_simulation | 2 +- third_party/dynamics/dynamics.go | 25 +++++++++++------ .../semi_physical_train.go | 3 +-- third_party/third_party.go | 4 ++- third_party/udp/udp_client.go | 12 ++++++--- third_party/udp/udp_server.go | 27 +++++++++++-------- ts/test_simulation_manage.go | 2 +- 8 files changed, 48 insertions(+), 29 deletions(-) diff --git a/bj-rtss-message b/bj-rtss-message index bc70ae8..7b637f0 160000 --- a/bj-rtss-message +++ b/bj-rtss-message @@ -1 +1 @@ -Subproject commit bc70ae80a0ed73ec417884710e02996103841bf9 +Subproject commit 7b637f0f519e0c6f5fa6817546f70000ac22fd10 diff --git a/rtss_simulation b/rtss_simulation index fa7211d..247003d 160000 --- a/rtss_simulation +++ b/rtss_simulation @@ -1 +1 @@ -Subproject commit fa7211d7bdf68be1df571f222a047ddd6cd764b0 +Subproject commit 247003d00b1d1c37e467242ac9b3c35e012b0eb8 diff --git a/third_party/dynamics/dynamics.go b/third_party/dynamics/dynamics.go index 83f80ec..09cf4b9 100644 --- a/third_party/dynamics/dynamics.go +++ b/third_party/dynamics/dynamics.go @@ -97,6 +97,7 @@ func (d *dynamics) Name() string { // 解码列车信息并处理 func (d *dynamics) handleDynamicsTrainInfo(b []byte) { d.udpDelayRecorder.RecordInterval() + // slog.Debug("动力学列车信息近期消息间隔", "intervals", d.udpDelayRecorder.GetIntervals()) trainInfo := &message.DynamicsTrainInfo{} err := trainInfo.Decode(b) if err != nil { @@ -230,12 +231,16 @@ func (d *dynamics) Start(manager DynamicsMessageManager) error { } d.manager = manager // 初始化客户端信息 - d.initDynamics() + err := d.initDynamics() + if err != nil { + d.Stop() + return err + } // 初始化运行资源 - err := d.initDynamicsRunRepository() + err = d.initDynamicsRunRepository() if err != nil { d.Stop() // 发送错误后将信息销毁 - panic(err) + return err } ctx, cancle := context.WithCancel(context.Background()) go d.sendTurnoutStateTask(ctx) @@ -246,13 +251,13 @@ func (d *dynamics) Start(manager DynamicsMessageManager) error { } // 初始化客户端、服务等信息 -func (d *dynamics) initDynamics() { +func (d *dynamics) initDynamics() error { d.turnoutStateUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemotePort)) d.trainControlUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemoteTrainPort)) d.baseUrl = getUrlBase(d.runConfig) d.httpClient = &http.Client{Timeout: time.Second * 5} d.trainInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", d.runConfig.UdpLocalPort), d.handleDynamicsTrainInfo) - d.trainInfoUdpServer.Listen() + return d.trainInfoUdpServer.Listen() } // 动力学运行所需数据 @@ -268,23 +273,27 @@ func (d *dynamics) initDynamicsRunRepository() error { func (d *dynamics) Stop() { initMutex.Lock() defer initMutex.Unlock() + slog.Debug("动力学服务停止") // 停止网络监听 d.udpDelayRecorder.Stop() + if d.turnoutTaskCancel != nil { + d.turnoutTaskCancel() + } if d.httpClient != nil { d.requestStopSimulation() d.httpClient = nil } if d.turnoutStateUdpClient != nil { d.turnoutStateUdpClient.Close() + d.turnoutStateUdpClient = nil } if d.trainControlUdpClient != nil { d.trainControlUdpClient.Close() + d.trainControlUdpClient = nil } if d.trainInfoUdpServer != nil { d.trainInfoUdpServer.Close() - } - if d.turnoutTaskCancel != nil { - d.turnoutTaskCancel() + d.trainInfoUdpServer = nil } d.manager = nil d.updateState(tpapi.ThirdPartyState_Closed) diff --git a/third_party/semi_physical_train/semi_physical_train.go b/third_party/semi_physical_train/semi_physical_train.go index 91d47bd..144ceba 100644 --- a/third_party/semi_physical_train/semi_physical_train.go +++ b/third_party/semi_physical_train/semi_physical_train.go @@ -2,7 +2,6 @@ package semi_physical_train import ( "fmt" - "log/slog" "sync" "joylink.club/bj-rtsts-server/config" @@ -55,7 +54,7 @@ func (s *semiPhysicalTrainImpl) Name() string { func (s *semiPhysicalTrainImpl) handleTrainControlMsg(b []byte) { s.udpDelayRecorder.RecordInterval() - slog.Debug(fmt.Sprintf("半实物列车控制消息近期消息间隔: %v", s.udpDelayRecorder.GetIntervals())) + // slog.Debug(fmt.Sprintf("半实物列车控制消息近期消息间隔: %v", s.udpDelayRecorder.GetIntervals())) handler := s.manager if handler != nil { handler.HandleSemiPhysicalTrainControlMsg(b) diff --git a/third_party/third_party.go b/third_party/third_party.go index 5b7a334..f1d266a 100644 --- a/third_party/third_party.go +++ b/third_party/third_party.go @@ -28,8 +28,9 @@ func convertServiceName(name string) state_proto.SimulationThirdPartyApiService_ return state_proto.SimulationThirdPartyApiService_Dynamics case semi_physical_train.Name: return state_proto.SimulationThirdPartyApiService_SemiPhysicalTrain + default: + return state_proto.SimulationThirdPartyApiService_Undefined } - return state_proto.SimulationThirdPartyApiService_Undefined } func GetRunningServiceStates() *state_proto.SimulationThirdPartyApiService { @@ -38,6 +39,7 @@ func GetRunningServiceStates() *state_proto.SimulationThirdPartyApiService { t := convertServiceName(tpas.Name()) if t == state_proto.SimulationThirdPartyApiService_Undefined { slog.Error("未知的第三方接口服务类型", "name", tpas.Name()) + continue } switch tpas.State() { case tpapi.ThirdPartyState_Normal: diff --git a/third_party/udp/udp_client.go b/third_party/udp/udp_client.go index a7e5d56..4525d26 100644 --- a/third_party/udp/udp_client.go +++ b/third_party/udp/udp_client.go @@ -6,8 +6,8 @@ import ( ) type UdpClient interface { - SendMsg(msg UdpMessageEncoder) - Send(b []byte) + SendMsg(msg UdpMessageEncoder) error + Send(b []byte) error Close() } @@ -62,20 +62,24 @@ func NewClientWithLocalAddr(remoteAddr string, localAddr string) UdpClient { return c } -func (c *client) SendMsg(msg UdpMessageEncoder) { +func (c *client) SendMsg(msg UdpMessageEncoder) error { b := msg.Encode() _, err := c.conn.Write(b) if err != nil { slog.Error("udp client send error", "error", err) + return err } + return nil // slog.Debug("udp client send", "size", n) } -func (c *client) Send(b []byte) { +func (c *client) Send(b []byte) error { _, err := c.conn.Write(b) if err != nil { slog.Error("udp client send error", "error", err) + return err } + return nil // slog.Debug("udp client send", "size", n) } diff --git a/third_party/udp/udp_server.go b/third_party/udp/udp_server.go index 7e702e2..b2f19b8 100644 --- a/third_party/udp/udp_server.go +++ b/third_party/udp/udp_server.go @@ -8,7 +8,7 @@ import ( ) type UdpServer interface { - Listen() + Listen() error Close() } @@ -19,44 +19,49 @@ type server struct { conn *net.UDPConn handler UdpMsgHandler cancelFn context.CancelFunc + done chan struct{} // 服务协程退出信号 } // NewServer creates a new instance of UdpServer. func NewServer(addr string, handler UdpMsgHandler) UdpServer { - return &server{addr: addr, handler: handler} + return &server{addr: addr, handler: handler, done: make(chan struct{})} } -func (s *server) Listen() { +func (s *server) Listen() error { udpAddr, err := net.ResolveUDPAddr("udp", s.addr) if err != nil { - panic(err) + return err } conn, err := net.ListenUDP("udp", udpAddr) if err != nil { - panic(err) + return err } s.conn = conn ctx, cfn := context.WithCancel(context.Background()) // 启动监听处理 go s.listenAndHandle(ctx) s.cancelFn = cfn + return nil } func (s *server) Close() { + err := s.conn.Close() + if err != nil { + slog.Error("udp server close error", "error", err) + } s.cancelFn() - // err := s.conn.Close() - // if err != nil { - // slog.Error("udp server close error", "error", err) - // } + <-s.done } func (s *server) listenAndHandle(ctx context.Context) { + defer close(s.done) defer s.conn.Close() +mainLoop: for { select { case <-ctx.Done(): - slog.Info("udp server listen 关闭", "addr", s.addr) - return + // slog.Info("udp server listen 关闭", "addr", s.addr) + break mainLoop default: } buf := make([]byte, 1024) diff --git a/ts/test_simulation_manage.go b/ts/test_simulation_manage.go index f079fa7..123cdd0 100644 --- a/ts/test_simulation_manage.go +++ b/ts/test_simulation_manage.go @@ -84,7 +84,7 @@ func DestroySimulation(simulationId string) { // 停止ecs world simulationInfo.World.Close() message_server.Close(simulationInfo) - // 发布销毁消息 + // 确保发布销毁消息 message_server.PubSimulationDestroyMsg(simulationId) // 停止第三方 stopThirdParty(simulationInfo)