package dynamics import ( "bytes" "context" "encoding/json" "fmt" "log/slog" "net/http" "runtime/debug" "sync" "time" "joylink.club/bj-rtsts-server/config" "joylink.club/bj-rtsts-server/sys_error" "joylink.club/bj-rtsts-server/third_party/message" "joylink.club/bj-rtsts-server/third_party/tpapi" "joylink.club/bj-rtsts-server/third_party/udp" ) type DynamicsMessageManager interface { CollectDynamicsTurnoutInfo() *message.DynamicsTurnoutInfo HandleDynamicsTrainInfo(info *message.DynamicsTrainInfo) GetDynamicsRunConfig() *config.DynamicsConfig GetDynamicsRunRepository() *message.LineBaseInfo } // 动力学接口 type Dynamics interface { tpapi.ThirdPartyApiService // 请求添加列车 RequestAddTrain(info *message.InitTrainInfo) error // 请求移除列车 RequestRemoveTrain(req *message.RemoveTrainReq) error //请求变更列车参数 TrainOperationConfig(req *message.TrainOperationConfig) error // 启动动力学消息功能 Start(manager DynamicsMessageManager) error // 停止动力学消息功能 Stop() // 发送列车控制消息 SendTrainControlMessage(b []byte) } var _default *dynamics var initMutex sync.Mutex const Name = "动力学" const Interval = 15 func Default() Dynamics { initMutex.Lock() defer initMutex.Unlock() if _default == nil { _default = &dynamics{ udpDelayRecorder: tpapi.NewUdpDelayRecorder(Interval, func(err error) { if err != nil { _default.updateState(tpapi.ThirdPartyState_Broken) } else { _default.updateState(tpapi.ThirdPartyState_Normal) } }), } } return _default } type dynamics struct { tpapi.ThirdPartyApiService trainInfoUdpServer udp.UdpServer turnoutStateUdpClient udp.UdpClient trainControlUdpClient udp.UdpClient state tpapi.ThirdPartyApiServiceState udpDelayRecorder *tpapi.UdpDelayRecorder baseUrl string httpClient *http.Client manager DynamicsMessageManager turnoutTaskCancel context.CancelFunc runConfig *config.DynamicsConfig } func (d *dynamics) updateState(state tpapi.ThirdPartyApiServiceState) { d.state = state } func (d *dynamics) State() tpapi.ThirdPartyApiServiceState { return d.state } func (d *dynamics) Name() string { return Name } // 解码列车信息并处理 func (d *dynamics) handleDynamicsTrainInfo(b []byte) { d.udpDelayRecorder.RecordInterval() trainInfo := &message.DynamicsTrainInfo{} err := trainInfo.Decode(b) if err != nil { panic(err) } handler := d.manager if handler != nil { handler.HandleDynamicsTrainInfo(trainInfo) } } func getUrlBase(c *config.DynamicsConfig) string { ip := c.Ip var port string if c.HttpPort != 0 { port = fmt.Sprintf(":%d", c.HttpPort) } urlBase := "http://" + ip + port return urlBase } func (d *dynamics) buildUrl(uri string) string { return d.baseUrl + uri } func (d *dynamics) requestStartSimulation(base *message.LineBaseInfo) error { if !d.runConfig.Open { return nil } url := d.buildUrl("/api/start/") data, _ := json.Marshal(base) resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) if err != nil { return sys_error.New("动力学开始仿真请求发送错误", err) } defer resp.Body.Close() var buf []byte _, err = resp.Body.Read(buf) if err != nil { return sys_error.New("动力学开始仿真请求响应错误", err) } return nil } func (d *dynamics) requestStopSimulation() error { if !d.runConfig.Open { return nil } url := d.buildUrl("/api/end/") resp, err := d.httpClient.Post(url, "application/json", nil) if err != nil { return fmt.Errorf("停止仿真请求异常: %v", err) } defer resp.Body.Close() var buf []byte _, err = resp.Body.Read(buf) if err != nil { return fmt.Errorf("停止仿真响应读取异常: %v", err) } return nil } func (d *dynamics) RequestAddTrain(info *message.InitTrainInfo) error { if !d.runConfig.Open { return nil } url := d.buildUrl("/api/aerodynamics/init/train/") data, _ := json.Marshal(info) resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) if err != nil { return fmt.Errorf("动力学添加列车请求异常: %v", err) } defer resp.Body.Close() var buf []byte _, err = resp.Body.Read(buf) if err != nil { return fmt.Errorf("动力学添加列车响应读取异常: %v", err) } return nil } func (d *dynamics) RequestRemoveTrain(req *message.RemoveTrainReq) error { if !d.runConfig.Open { return nil } url := d.buildUrl("/api/aerodynamics/remove/train/") data, _ := json.Marshal(req) resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) if err != nil { return fmt.Errorf("动力学移除列车请求异常: %v", err) } defer resp.Body.Close() var buf []byte _, err = resp.Body.Read(buf) if err != nil { return fmt.Errorf("动力学移除列车响应读取异常: %v", err) } return nil } // 动力学数据设置 func (d *dynamics) TrainOperationConfig(req *message.TrainOperationConfig) error { if !d.runConfig.Open { return nil } data, _ := json.Marshal(req) url := d.buildUrl("/api/config") resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) if err != nil { return fmt.Errorf("动力学移除列车请求异常: %v", err) } defer resp.Body.Close() var bodyData []byte _, err = resp.Body.Read(bodyData) if err != nil { return fmt.Errorf("动力学移除列车响应读取异常: %v", err) } return nil } func (d *dynamics) Start(manager DynamicsMessageManager) error { if manager == nil { panic("启动动力学消息服务错误: DynamicsMessageManager不能为nil") } if d.manager != nil { return fmt.Errorf("启动动力学消息服务错误: 存在正在运行的任务") } d.runConfig = manager.GetDynamicsRunConfig() if d.runConfig == nil || d.runConfig.Ip == "" || !d.runConfig.Open { return nil } d.manager = manager // 初始化客户端信息 d.initDynamics() // 初始化运行资源 err := d.initDynamicsRunRepository() if err != nil { d.Stop() // 发送错误后将信息销毁 panic(err) } ctx, cancle := context.WithCancel(context.Background()) go d.sendTurnoutStateTask(ctx) d.turnoutTaskCancel = cancle d.updateState(tpapi.ThirdPartyState_Normal) d.udpDelayRecorder.Start() return nil } // 初始化客户端、服务等信息 func (d *dynamics) initDynamics() { d.turnoutStateUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemotePort)) d.trainControlUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemoteTrainPort)) d.baseUrl = getUrlBase(d.runConfig) d.httpClient = &http.Client{Timeout: time.Second * 5} d.trainInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", d.runConfig.UdpLocalPort), d.handleDynamicsTrainInfo) d.trainInfoUdpServer.Listen() } // 动力学运行所需数据 func (d *dynamics) initDynamicsRunRepository() error { // 动力学接口调用 err := d.requestStartSimulation(d.manager.GetDynamicsRunRepository()) if err != nil { return err } return nil } func (d *dynamics) Stop() { initMutex.Lock() defer initMutex.Unlock() // 停止网络监听 d.udpDelayRecorder.Stop() if d.httpClient != nil { d.requestStopSimulation() d.httpClient = nil } if d.turnoutStateUdpClient != nil { d.turnoutStateUdpClient.Close() } if d.trainControlUdpClient != nil { d.trainControlUdpClient.Close() } if d.trainInfoUdpServer != nil { d.trainInfoUdpServer.Close() } if d.turnoutTaskCancel != nil { d.turnoutTaskCancel() } d.manager = nil d.updateState(tpapi.ThirdPartyState_Closed) } const ( // 道岔消息发送间隔,单位ms TurnoutMessageSendInterval = 50 ) var turnoutStateLifeSignal uint16 //道岔消息生命信号 // 定时发送道岔状态任务 func (d *dynamics) sendTurnoutStateTask(ctx context.Context) { defer func() { if err := recover(); err != nil { slog.Error("定时发送道岔状态任务异常", "error", err, "stack", string(debug.Stack())) debug.PrintStack() } }() for { select { case <-ctx.Done(): return default: } turnoutState := d.manager.CollectDynamicsTurnoutInfo() turnoutStateLifeSignal++ turnoutState.LifeSignal = turnoutStateLifeSignal d.turnoutStateUdpClient.SendMsg(turnoutState) time.Sleep(time.Millisecond * TurnoutMessageSendInterval) } } func (d *dynamics) SendTrainControlMessage(b []byte) { d.trainControlUdpClient.Send(b) }